A .editorconfig => .editorconfig +5 -0
@@ 0,0 1,5 @@
+root = true
+
+[*.rs]
+indent_style = space
+indent_size = 4
M linetest/src/main.rs => linetest/src/main.rs +14 -16
@@ 2,28 2,26 @@ use std::thread;
use futures_util::future;
-
mod shell;
fn main() -> anyhow::Result<()> {
- // create channels just to keep task thread alive
- //let (chan_tx, chan_rx) = async_channel::bounded(1024);
-
+ // create channels just to keep task thread alive
+ //let (chan_tx, chan_rx) = async_channel::bounded(1024);
- // run shell on its own dedicated thread
- let ui_handle = thread::spawn(move || { shell::ui::start_task() });
+ // run shell on its own dedicated thread
+ let ui_handle = thread::spawn(move || shell::ui::start_task());
- // start async worker pool
- // TODO: moar threads?
- let _threadpool_handle = thread::spawn(move || { smol::run(future::pending::<()>()) });
+ // start async worker pool
+ // TODO: moar threads?
+ let _threadpool_handle = thread::spawn(move || smol::run(future::pending::<()>()));
- // probably having a bad day if we get here ...
- if let Err(msg) = ui_handle.join() {
- panic!("err: {:?}", msg);
- }
+ // probably having a bad day if we get here ...
+ if let Err(msg) = ui_handle.join() {
+ panic!("err: {:?}", msg);
+ }
- // TODO: gracefully shutdown threadpool
- println!("goodbye ...");
+ // TODO: gracefully shutdown threadpool
+ println!("goodbye ...");
- Ok(())
+ Ok(())
}
M linetest/src/shell/mod.rs => linetest/src/shell/mod.rs +134 -126
@@ 1,4 1,4 @@
-use async_channel::{Sender, Receiver, TryRecvError};
+use async_channel::{Receiver, Sender, TryRecvError};
use std::collections::hash_map::HashMap;
use std::net::{SocketAddr, ToSocketAddrs};
use thiserror::Error;
@@ 8,159 8,167 @@ pub mod ui;
#[derive(Debug, Error)]
pub enum Error {
- #[error("could not parse at least one socket address.")]
- NoSocketAddr,
+ #[error("could not parse at least one socket address.")]
+ NoSocketAddr,
- #[error("command missing required argument")]
- MissingArgument,
+ #[error("command missing required argument")]
+ MissingArgument,
- #[error("could not parse the number in position: {}", pos)]
- ParseError {
- pos: &'static str,
+ #[error("could not parse the number in position: {}", pos)]
+ ParseError {
+ pos: &'static str,
- #[source]
- source: std::num::ParseIntError,
- },
+ #[source]
+ source: std::num::ParseIntError,
+ },
- #[error("specified connection does not exist!?")]
- MissingConnectionId,
+ #[error("specified connection does not exist!?")]
+ MissingConnectionId,
}
/// These are operations which can be exchanged between the terminal
/// interface and the network connection task.
pub enum ConnMsg {
- /// A line from the client to be shown in the top-level system log.
- LogLine { msg: String },
+ /// A line from the client to be shown in the top-level system log.
+ LogLine { msg: String },
- /// A line from the client to be sent/displayed in the corresponding buffer.
- /// Where `id` is the name of an associated room.
- BufLine { id: String, msg: String },
+ /// A line from the client to be sent/displayed in the corresponding buffer.
+ /// Where `id` is the name of an associated room.
+ BufLine { id: String, msg: String },
- /// The server rejected our connection (temporarily)
- RejectTemp { reason: String },
+ /// The server rejected our connection (temporarily)
+ RejectTemp { reason: String },
- /// Change username
- ChangeNick { name: String },
+ /// Change username
+ ChangeNick { name: String },
- /// Join room
- JoinRoom { room: String },
+ /// Join room
+ JoinRoom { room: String },
- /// Part room
- PartRoom { room: String },
+ /// Part room
+ PartRoom { room: String },
}
#[derive(Clone, Debug)]
pub struct Connection {
- id: i64,
- name: String,
- addr: SocketAddr,
- is_connected: bool,
+ id: i64,
+ name: String,
+ addr: SocketAddr,
+ is_connected: bool,
}
pub struct ConnectionMap {
- next_connection_id: i64,
- connections: HashMap<i64, Connection>,
-
- socket_rx: HashMap<i64, Receiver<ConnMsg>>,
- socket_tx: HashMap<i64, Sender<ConnMsg>>,
+ next_connection_id: i64,
+ connections: HashMap<i64, Connection>,
+ socket_rx: HashMap<i64, Receiver<ConnMsg>>,
+ socket_tx: HashMap<i64, Sender<ConnMsg>>,
}
/// This is the mailbox which should be passed to the new connection task.
/// This allows the UI thread & network thread to communicate bidirectionally
/// by exchaning `ConnMsg` structs.
pub struct ConnectionMailbox {
- outbox: Sender<ConnMsg>,
- inbox: Receiver<ConnMsg>,
-
+ outbox: Sender<ConnMsg>,
+ inbox: Receiver<ConnMsg>,
}
impl ConnectionMap {
- pub fn new() -> Self {
- Self {
- next_connection_id: 0,
- connections: HashMap::new(),
- socket_rx: HashMap::new(),
- socket_tx: HashMap::new(),
- }
- }
-
- /// Creates a pending connection and returns the corresponding
- /// connection id & channel.
- pub fn create_connection<Sock: ToSocketAddrs>(&mut self, addr: Sock, name: &str) -> anyhow::Result<(i64, ConnectionMailbox)> {
- // fetch an ID & increment the counter
- let next_id = self.next_connection_id;
- self.next_connection_id += 1;
-
- let addr = addr.to_socket_addrs()?
- .next()
- .ok_or_else(|| { Error::NoSocketAddr })?;
-
- // create a new connection
- let (net_outbox_tx, net_outbox_rx) = async_channel::bounded(1024);
- let (net_inbox_tx, net_inbox_rx) = async_channel::bounded(1024);
-
- let connection = Connection {
- id: next_id,
- name: name.to_string(),
- addr: addr,
- is_connected: false,
- };
-
- if let Some(_) = self.connections.insert(next_id, connection) {
- // TODO: handle the collision?
- panic!("BUG: collision in connection map.");
- }
-
- if let Some(_) = self.socket_rx.insert(next_id, net_outbox_rx) {
- // TODO: handle the collision?
- panic!("BUG: collision in connection map.");
- }
-
- if let Some(_) = self.socket_tx.insert(next_id, net_inbox_tx) {
- // TODO: handle the collision?
- panic!("BUG: collision in connection map.");
- }
-
- Ok((next_id, ConnectionMailbox { outbox: net_outbox_tx, inbox: net_inbox_rx }))
- }
-
- pub fn poll_messages(&mut self) -> HashMap<i64, Vec<ConnMsg>> {
- let mut messages = HashMap::new();
-
- let mut dead_peers = vec![];
-
- // drain messages for each connection, non-blocking
- for (id, conn) in &self.socket_rx {
- let mut msg_q = vec![];
-
- loop {
- match conn.try_recv() {
- Ok(msg) => msg_q.push(msg),
- Err(TryRecvError::Empty) => break,
- Err(TryRecvError::Closed) => {
- dead_peers.push(*id);
- break;
- },
- }
- }
-
- if let Some(_) = messages.insert(*id, msg_q) {
- panic!("BUG: collision in incoming messages map");
- }
- }
-
- self.reap_dead_peers(&dead_peers);
-
- return messages;
- }
-
- fn reap_dead_peers(&mut self, ids: &Vec<i64>) {
- for peer_id in ids {
- self.socket_rx.remove(&peer_id);
- self.socket_tx.remove(&peer_id);
- self.connections.remove(&peer_id);
- }
-
- }
-}>
\ No newline at end of file
+ pub fn new() -> Self {
+ Self {
+ next_connection_id: 0,
+ connections: HashMap::new(),
+ socket_rx: HashMap::new(),
+ socket_tx: HashMap::new(),
+ }
+ }
+
+ /// Creates a pending connection and returns the corresponding
+ /// connection id & channel.
+ pub fn create_connection<Sock: ToSocketAddrs>(
+ &mut self,
+ addr: Sock,
+ name: &str,
+ ) -> anyhow::Result<(i64, ConnectionMailbox)> {
+ // fetch an ID & increment the counter
+ let next_id = self.next_connection_id;
+ self.next_connection_id += 1;
+
+ let addr = addr
+ .to_socket_addrs()?
+ .next()
+ .ok_or_else(|| Error::NoSocketAddr)?;
+
+ // create a new connection
+ let (net_outbox_tx, net_outbox_rx) = async_channel::bounded(1024);
+ let (net_inbox_tx, net_inbox_rx) = async_channel::bounded(1024);
+
+ let connection = Connection {
+ id: next_id,
+ name: name.to_string(),
+ addr: addr,
+ is_connected: false,
+ };
+
+ if let Some(_) = self.connections.insert(next_id, connection) {
+ // TODO: handle the collision?
+ panic!("BUG: collision in connection map.");
+ }
+
+ if let Some(_) = self.socket_rx.insert(next_id, net_outbox_rx) {
+ // TODO: handle the collision?
+ panic!("BUG: collision in connection map.");
+ }
+
+ if let Some(_) = self.socket_tx.insert(next_id, net_inbox_tx) {
+ // TODO: handle the collision?
+ panic!("BUG: collision in connection map.");
+ }
+
+ Ok((
+ next_id,
+ ConnectionMailbox {
+ outbox: net_outbox_tx,
+ inbox: net_inbox_rx,
+ },
+ ))
+ }
+
+ pub fn poll_messages(&mut self) -> HashMap<i64, Vec<ConnMsg>> {
+ let mut messages = HashMap::new();
+
+ let mut dead_peers = vec![];
+
+ // drain messages for each connection, non-blocking
+ for (id, conn) in &self.socket_rx {
+ let mut msg_q = vec![];
+
+ loop {
+ match conn.try_recv() {
+ Ok(msg) => msg_q.push(msg),
+ Err(TryRecvError::Empty) => break,
+ Err(TryRecvError::Closed) => {
+ dead_peers.push(*id);
+ break;
+ }
+ }
+ }
+
+ if let Some(_) = messages.insert(*id, msg_q) {
+ panic!("BUG: collision in incoming messages map");
+ }
+ }
+
+ self.reap_dead_peers(&dead_peers);
+
+ return messages;
+ }
+
+ fn reap_dead_peers(&mut self, ids: &Vec<i64>) {
+ for peer_id in ids {
+ self.socket_rx.remove(&peer_id);
+ self.socket_tx.remove(&peer_id);
+ self.connections.remove(&peer_id);
+ }
+ }
+}
M linetest/src/shell/net.rs => linetest/src/shell/net.rs +385 -375
@@ 1,418 1,428 @@
-use crate::shell::{self, Connection, ConnMsg};
+use crate::shell::{self, ConnMsg, Connection};
use smolboi::protocol::packet::Packet;
use std::net::TcpStream;
use async_channel::Sender;
use async_dup::Arc;
-use byteorder::{ReadBytesExt, WriteBytesExt, NetworkEndian};
+use byteorder::{NetworkEndian, ReadBytesExt, WriteBytesExt};
use futures_util::{select, AsyncReadExt, AsyncWriteExt, StreamExt};
use smol::Async;
use thiserror::Error;
#[derive(Debug, Error)]
enum ClientError {
- #[error("an unspecified error has occurred.")]
- Unspecified,
+ #[error("an unspecified error has occurred.")]
+ Unspecified,
- #[error("the packet was well-formed, but not expected at this time.")]
- InvalidState,
+ #[error("the packet was well-formed, but not expected at this time.")]
+ InvalidState,
}
#[derive(Debug, Eq, PartialEq)]
enum ClientState {
- Connecting,
- Connected,
+ Connecting,
+ Connected,
}
#[derive(Debug)]
struct Client<S> {
- conn: Connection,
- outbox: Sender<ConnMsg>,
- sock: S,
- state: ClientState,
+ conn: Connection,
+ outbox: Sender<ConnMsg>,
+ sock: S,
+ state: ClientState,
}
-impl<S> Client<S>
-where S: AsyncReadExt + AsyncWriteExt + Unpin {
- pub fn with(conn: Connection, sock: S, outbox: Sender<ConnMsg>) -> Self {
- Self {
- conn: conn,
- sock: sock,
- outbox: outbox,
- state: ClientState::Connecting,
- }
- }
-
- /// Updates client state based on incoming packet.
- /// The state change will be relayed to the parent task which created this client
- /// by way of the `outbox` parameter supplied to the `with` constructor.
- pub async fn handle_net_pkt(&mut self, pkt: Packet) -> anyhow::Result<()> {
- match self.state {
- ClientState::Connecting => match pkt {
- Packet::RegistrationAccepted { name } => {
- // !!! use the name the server accepted !!!
- //
- // this avoids race where user tries to change nick before
- // the server finishes sending the accept/reject packet for
- // the outstanding request.
- self.conn.name = name;
- self.state = ClientState::Connected;
-
- let log_line = ConnMsg::LogLine { msg: "registered ... ok.".to_string() };
- self.outbox.send(log_line).await?;
- Ok(())
- },
-
- Packet::RegistrationRejected => {
- let msg = ConnMsg::RejectTemp {
- reason: "[server]: registration rejected. try another username.".to_string()
- };
-
- Ok(self.outbox.send(msg).await?)
- },
-
- Packet::Notice { body } => {
- let msg = ConnMsg::LogLine { msg: format!("wtf: {}", body) };
- Ok(self.outbox.send(msg).await?)
- },
-
- _ => Err(ClientError::InvalidState.into()),
- },
-
- ClientState::Connected => match pkt {
- Packet::MessageRoom { room, from, body } => {
- let msg = ConnMsg::BufLine {
- id: room,
- msg: format!("{}: {}", from, body),
- };
-
- Ok(self.outbox.send(msg).await?)
- },
-
- Packet::Notice { body } => {
- let msg = ConnMsg::LogLine { msg: body };
- Ok(self.outbox.send(msg).await?)
- },
-
- _ => Err(ClientError::InvalidState.into()),
- },
- }
- }
-
- pub async fn handle_shell_msg(&mut self, msg: ConnMsg) -> anyhow::Result<()> {
- match self.state {
- ClientState::Connecting => match msg {
- ConnMsg::ChangeNick { name } => {
- Ok(write_packet(&mut self.sock, Packet::Register { name: name }).await?)
- },
-
- _ => Err(ClientError::Unspecified.into()),
- },
-
- ClientState::Connected => match msg {
- ConnMsg::BufLine { id, msg } => {
- let packet = Packet::MessageRoom {
- room: id,
- from: self.conn.name.clone(),
- body: msg,
- };
-
- Ok(write_packet(&mut self.sock, packet).await?)
- },
-
- ConnMsg::JoinRoom { room } => {
- Ok(write_packet(&mut self.sock, Packet::Join { room: room }).await?)
- },
-
- ConnMsg::PartRoom { room } => {
- Ok(write_packet(&mut self.sock, Packet::Part { room: room }).await?)
- },
-
- _ => Err(ClientError::Unspecified.into()),
- },
- }
- }
+impl<S> Client<S>
+where
+ S: AsyncReadExt + AsyncWriteExt + Unpin,
+{
+ pub fn with(conn: Connection, sock: S, outbox: Sender<ConnMsg>) -> Self {
+ Self {
+ conn: conn,
+ sock: sock,
+ outbox: outbox,
+ state: ClientState::Connecting,
+ }
+ }
+
+ /// Updates client state based on incoming packet.
+ /// The state change will be relayed to the parent task which created this client
+ /// by way of the `outbox` parameter supplied to the `with` constructor.
+ pub async fn handle_net_pkt(&mut self, pkt: Packet) -> anyhow::Result<()> {
+ match self.state {
+ ClientState::Connecting => match pkt {
+ Packet::RegistrationAccepted { name } => {
+ // !!! use the name the server accepted !!!
+ //
+ // this avoids race where user tries to change nick before
+ // the server finishes sending the accept/reject packet for
+ // the outstanding request.
+ self.conn.name = name;
+ self.state = ClientState::Connected;
+
+ let log_line = ConnMsg::LogLine {
+ msg: "registered ... ok.".to_string(),
+ };
+ self.outbox.send(log_line).await?;
+ Ok(())
+ }
+
+ Packet::RegistrationRejected => {
+ let msg = ConnMsg::RejectTemp {
+ reason: "[server]: registration rejected. try another username."
+ .to_string(),
+ };
+
+ Ok(self.outbox.send(msg).await?)
+ }
+
+ Packet::Notice { body } => {
+ let msg = ConnMsg::LogLine {
+ msg: format!("wtf: {}", body),
+ };
+ Ok(self.outbox.send(msg).await?)
+ }
+
+ _ => Err(ClientError::InvalidState.into()),
+ },
+
+ ClientState::Connected => match pkt {
+ Packet::MessageRoom { room, from, body } => {
+ let msg = ConnMsg::BufLine {
+ id: room,
+ msg: format!("{}: {}", from, body),
+ };
+
+ Ok(self.outbox.send(msg).await?)
+ }
+
+ Packet::Notice { body } => {
+ let msg = ConnMsg::LogLine { msg: body };
+ Ok(self.outbox.send(msg).await?)
+ }
+
+ _ => Err(ClientError::InvalidState.into()),
+ },
+ }
+ }
+
+ pub async fn handle_shell_msg(&mut self, msg: ConnMsg) -> anyhow::Result<()> {
+ match self.state {
+ ClientState::Connecting => match msg {
+ ConnMsg::ChangeNick { name } => {
+ Ok(write_packet(&mut self.sock, Packet::Register { name: name }).await?)
+ }
+
+ _ => Err(ClientError::Unspecified.into()),
+ },
+
+ ClientState::Connected => match msg {
+ ConnMsg::BufLine { id, msg } => {
+ let packet = Packet::MessageRoom {
+ room: id,
+ from: self.conn.name.clone(),
+ body: msg,
+ };
+
+ Ok(write_packet(&mut self.sock, packet).await?)
+ }
+
+ ConnMsg::JoinRoom { room } => {
+ Ok(write_packet(&mut self.sock, Packet::Join { room: room }).await?)
+ }
+
+ ConnMsg::PartRoom { room } => {
+ Ok(write_packet(&mut self.sock, Packet::Part { room: room }).await?)
+ }
+
+ _ => Err(ClientError::Unspecified.into()),
+ },
+ }
+ }
}
async fn write_packet<W>(wtr: &mut W, packet: Packet) -> anyhow::Result<()>
- where W: AsyncWriteExt + Unpin {
- let packet_buf = packet.encode()?;
- let packet_len = packet_buf.len();
- assert!(packet_len <= u16::MAX.into());
-
- let mut packet_sz = [0u8; 2];
- (&mut packet_sz[..]).write_u16::<NetworkEndian>(packet_len as u16)?;
-
- wtr.write(&packet_sz).await?;
- wtr.write(&packet_buf).await?;
- Ok(())
- }
-
-async fn decode_packet<R>(stream: &mut R) -> anyhow::Result<Packet>
-where R: AsyncReadExt + Unpin {
- let mut packet_header_buf = [0u8; 2];
- stream.read_exact(&mut packet_header_buf).await?;
-
- let packet_sz = (&packet_header_buf[..]).read_u16::<NetworkEndian>()?;
-
- assert!(packet_sz <= u16::MAX);
- let mut packet_body_buf = vec![0u8; packet_sz.into()];
- stream.read_exact(&mut packet_body_buf).await?;
-
- Ok(Packet::decode(&packet_body_buf)?)
+where
+ W: AsyncWriteExt + Unpin,
+{
+ let packet_buf = packet.encode()?;
+ let packet_len = packet_buf.len();
+ assert!(packet_len <= u16::MAX.into());
+
+ let mut packet_sz = [0u8; 2];
+ (&mut packet_sz[..]).write_u16::<NetworkEndian>(packet_len as u16)?;
+
+ wtr.write(&packet_sz).await?;
+ wtr.write(&packet_buf).await?;
+ Ok(())
+}
+
+async fn decode_packet<R>(stream: &mut R) -> anyhow::Result<Packet>
+where
+ R: AsyncReadExt + Unpin,
+{
+ let mut packet_header_buf = [0u8; 2];
+ stream.read_exact(&mut packet_header_buf).await?;
+
+ let packet_sz = (&packet_header_buf[..]).read_u16::<NetworkEndian>()?;
+
+ assert!(packet_sz <= u16::MAX);
+ let mut packet_body_buf = vec![0u8; packet_sz.into()];
+ stream.read_exact(&mut packet_body_buf).await?;
+
+ Ok(Packet::decode(&packet_body_buf)?)
}
/// Starts a connection to the server specified in the background.
/// This returns control to the caller once the task has been sent to the
/// executor, at which point the caller can begin polling the associated
/// task mailbox.
-///
+///
/// Returns an error in the event the TCP socket could not be opeend.
pub async fn start_task(
- conn: Connection,
- shell_mx: shell::ConnectionMailbox,
+ conn: Connection,
+ shell_mx: shell::ConnectionMailbox,
) -> anyhow::Result<()> {
- let sock = Arc::new(Async::<TcpStream>::connect(conn.addr).await?);
-
- smol::Task::spawn(async {
- let task_outbox = shell_mx.outbox.clone();
- if let Err(msg) = net_worker_task(conn, sock, shell_mx).await {
- let error_log_ln = format!("BUG: net worker exited unexpectedly {:?}", msg);
- let _ = task_outbox.send(ConnMsg::LogLine { msg: error_log_ln }).await;
- }
- }).detach();
-
- Ok(())
+ let sock = Arc::new(Async::<TcpStream>::connect(conn.addr).await?);
+
+ smol::Task::spawn(async {
+ let task_outbox = shell_mx.outbox.clone();
+ if let Err(msg) = net_worker_task(conn, sock, shell_mx).await {
+ let error_log_ln = format!("BUG: net worker exited unexpectedly {:?}", msg);
+ let _ = task_outbox
+ .send(ConnMsg::LogLine { msg: error_log_ln })
+ .await;
+ }
+ })
+ .detach();
+
+ Ok(())
}
async fn net_worker_task(
- conn: Connection,
- mut sock: Arc<Async<TcpStream>>,
- shell_mx: shell::ConnectionMailbox
+ conn: Connection,
+ mut sock: Arc<Async<TcpStream>>,
+ shell_mx: shell::ConnectionMailbox,
) -> anyhow::Result<()> {
- // start packet decoding thread
- let (packets_tx, packets_rx) = async_channel::unbounded();
- let (quit_tx, quit_rx) = async_channel::unbounded::<void::Void>();
-
- // copy these fields so they don't move into task
- let mut packet_decoding_sock = sock.clone();
- let packet_shell_outbox = shell_mx.outbox.clone();
- let packet_conn_id = conn.id;
-
- smol::Task::spawn(async move {
- loop {
- match decode_packet(&mut packet_decoding_sock).await {
- Ok(packet) => {
- if let Err(_) = packets_tx.send(packet).await {
- panic!("BUG: sent packet but connection event loop is gone?");
- }
- },
-
- Err(msg) => {
-
- let log_line = ConnMsg::LogLine {
- msg: format!("FATAL: conn({}) received malformed packet: {:?}.", packet_conn_id, msg),
- };
-
- let _ = packet_shell_outbox.send(log_line).await;
- break;
- },
- }
- }
-
- drop(quit_tx);
- }).detach();
-
- // send out registration packet
- let packet = Packet::Register {
- name: conn.name.to_string(),
- };
-
- let packet_buf = packet.encode()?;
- if packet_buf.len() > u16::MAX.into() {
- panic!("BUG: user crafted >64KiB packet, this is not legal.");
- } // TODO: use that fancy `anyhow` error macro?
-
- let mut packet_sz = [0u8; 2];
- (&mut packet_sz[..]).write_u16::<NetworkEndian>(packet_buf.len() as u16)?;
-
- sock.write(&packet_sz).await?;
- sock.write(&packet_buf).await?;
-
-
- let mut packets_rx = packets_rx.fuse();
- let mut task_rx = shell_mx.inbox.fuse();
- let mut quit_rx = quit_rx.fuse();
-
- let mut client = Client::with(conn, sock.clone(), shell_mx.outbox.clone());
-
- loop {
- select! {
- packet = packets_rx.next() => match packet {
- Some(packet) => {
- if let Err(msg) = client.handle_net_pkt(packet).await {
- let error_log_ln = format!("client error: {:?}", msg);
- let _ = shell_mx.outbox.send(ConnMsg::LogLine { msg: error_log_ln }).await;
- }
- },
-
- None => { break },
- },
-
- task_msg = task_rx.next() => match task_msg {
- Some(task_msg) => {
- if let Err(msg) = client.handle_shell_msg(task_msg).await {
- let error_log_ln = format!("client error: {:?}", msg);
- let _ = shell_mx.outbox.send(ConnMsg::LogLine { msg: error_log_ln }).await;
- }
- },
-
- None => { break },
- },
-
- quit = quit_rx.next() => if quit.is_none() { break },
- }
- }
-
- let log_line = ConnMsg::LogLine { msg: "warning! net worker hung up!".to_string() };
- Ok(shell_mx.outbox.send(log_line).await?)
+ // start packet decoding thread
+ let (packets_tx, packets_rx) = async_channel::unbounded();
+ let (quit_tx, quit_rx) = async_channel::unbounded::<void::Void>();
+
+ // copy these fields so they don't move into task
+ let mut packet_decoding_sock = sock.clone();
+ let packet_shell_outbox = shell_mx.outbox.clone();
+ let packet_conn_id = conn.id;
+
+ smol::Task::spawn(async move {
+ loop {
+ match decode_packet(&mut packet_decoding_sock).await {
+ Ok(packet) => {
+ if let Err(_) = packets_tx.send(packet).await {
+ panic!("BUG: sent packet but connection event loop is gone?");
+ }
+ }
+
+ Err(msg) => {
+ let log_line = ConnMsg::LogLine {
+ msg: format!(
+ "FATAL: conn({}) received malformed packet: {:?}.",
+ packet_conn_id, msg
+ ),
+ };
+
+ let _ = packet_shell_outbox.send(log_line).await;
+ break;
+ }
+ }
+ }
+
+ drop(quit_tx);
+ })
+ .detach();
+
+ // send out registration packet
+ let packet = Packet::Register {
+ name: conn.name.to_string(),
+ };
+
+ let packet_buf = packet.encode()?;
+ if packet_buf.len() > u16::MAX.into() {
+ panic!("BUG: user crafted >64KiB packet, this is not legal.");
+ } // TODO: use that fancy `anyhow` error macro?
+
+ let mut packet_sz = [0u8; 2];
+ (&mut packet_sz[..]).write_u16::<NetworkEndian>(packet_buf.len() as u16)?;
+
+ sock.write(&packet_sz).await?;
+ sock.write(&packet_buf).await?;
+
+ let mut packets_rx = packets_rx.fuse();
+ let mut task_rx = shell_mx.inbox.fuse();
+ let mut quit_rx = quit_rx.fuse();
+
+ let mut client = Client::with(conn, sock.clone(), shell_mx.outbox.clone());
+
+ loop {
+ select! {
+ packet = packets_rx.next() => match packet {
+ Some(packet) => {
+ if let Err(msg) = client.handle_net_pkt(packet).await {
+ let error_log_ln = format!("client error: {:?}", msg);
+ let _ = shell_mx.outbox.send(ConnMsg::LogLine { msg: error_log_ln }).await;
+ }
+ },
+
+ None => { break },
+ },
+
+ task_msg = task_rx.next() => match task_msg {
+ Some(task_msg) => {
+ if let Err(msg) = client.handle_shell_msg(task_msg).await {
+ let error_log_ln = format!("client error: {:?}", msg);
+ let _ = shell_mx.outbox.send(ConnMsg::LogLine { msg: error_log_ln }).await;
+ }
+ },
+
+ None => { break },
+ },
+
+ quit = quit_rx.next() => if quit.is_none() { break },
+ }
+ }
+
+ let log_line = ConnMsg::LogLine {
+ msg: "warning! net worker hung up!".to_string(),
+ };
+ Ok(shell_mx.outbox.send(log_line).await?)
}
#[cfg(test)]
mod tests {
- use async_channel::unbounded;
- use core::pin::Pin;
- use futures_util::io::{self, AsyncRead, AsyncWrite, IoSlice, IoSliceMut};
- use futures_util::task::{Context, Poll};
- use sluice::pipe::{pipe, PipeReader, PipeWriter};
- use super::*;
-
-
- struct FakeTcpStream {
- rx: PipeReader,
- tx: PipeWriter,
- }
-
- impl FakeTcpStream {
- pub fn new() -> (Self, Self) {
- let (rx_a, tx_a) = pipe();
- let (rx_b, tx_b) = pipe();
-
- // create a "crossover" cable
- let socket_a = Self { rx: rx_b, tx: tx_a };
- let socket_b = Self { rx: rx_a, tx: tx_b };
-
- (socket_a, socket_b)
- }
- }
-
- impl AsyncRead for FakeTcpStream {
- fn poll_read(
- mut self: Pin<&mut Self>,
- cx: &mut Context<'_>,
- buf: &mut [u8]
- ) -> Poll<io::Result<usize>> {
- PipeReader::poll_read(Pin::new(&mut self.rx), cx, buf)
- }
-
- fn poll_read_vectored(
- mut self: Pin<&mut Self>,
- cx: &mut Context<'_>,
- bufs: &mut [IoSliceMut<'_>]
- ) -> Poll<io::Result<usize>> {
- PipeReader::poll_read_vectored(Pin::new(&mut self.rx), cx, bufs)
- }
- }
-
- impl AsyncWrite for FakeTcpStream {
- fn poll_write(
- mut self: Pin<&mut Self>,
- cx: &mut Context<'_>,
- buf: &[u8]
- ) -> Poll<io::Result<usize>> {
- PipeWriter::poll_write(Pin::new(&mut self.tx), cx, buf)
- }
-
- fn poll_write_vectored(
- mut self: Pin<&mut Self>,
- cx: &mut Context<'_>,
- bufs: &[IoSlice<'_>]
- ) -> Poll<io::Result<usize>> {
- PipeWriter::poll_write_vectored(Pin::new(&mut self.tx), cx, bufs)
- }
-
- fn poll_flush(
- mut self: Pin<&mut Self>,
- cx: &mut Context<'_>
- ) -> Poll<io::Result<()>> {
- PipeWriter::poll_flush(Pin::new(&mut self.tx), cx)
- }
-
- fn poll_close(mut self: Pin<&mut Self>,
- cx: &mut Context<'_>
- ) -> Poll<io::Result<()>> {
- PipeWriter::poll_close(Pin::new(&mut self.tx), cx)
- }
- }
-
-
- const CLIENT_NAME: &'static str = "Chuck Testa";
- const CLIENT_ADDR: &'static str = "127.0.0.1:1234";
-
- fn init_fake_connection() -> Connection {
- Connection {
- id: 0,
- name: CLIENT_NAME.to_string(),
- addr: CLIENT_ADDR.parse().unwrap(),
- is_connected: false,
- }
- }
-
- #[test]
- fn client_accepted_registration() -> anyhow::Result<()> {
- // initial setup
- let (client_sock, _server_sock) = FakeTcpStream::new();
- let (mx_tx, _mx_rx) = unbounded();
-
- // an initialized client should not be considered connected ...
- let mut client = Client::with(init_fake_connection(), client_sock, mx_tx);
- assert_eq!(client.state, ClientState::Connecting);
-
- // an accepted registration should result in a state change
- let resp = Packet::RegistrationAccepted {
- name: "Chuck Testa".to_string()
- };
-
- smol::block_on(client.handle_net_pkt(resp))?;
- assert_eq!(client.state, ClientState::Connected);
-
- Ok(())
- }
-
- #[test]
- fn client_rejected_registration() -> anyhow::Result<()> {
- // initial setup
- let (client_sock, mut server_sock) = FakeTcpStream::new();
- let (mx_tx, _mx_rx) = unbounded();
- let mut client = Client::with(init_fake_connection(), client_sock, mx_tx);
-
- // rejected client should still be in Connecting state
- smol::block_on(client.handle_net_pkt(Packet::RegistrationRejected))?;
- assert_eq!(client.state, ClientState::Connecting);
-
- // tell client to change nick
- let new_name = "newer_more_different_name";
- smol::block_on(client.handle_shell_msg(ConnMsg::ChangeNick {
- name: new_name.to_string()
- }))?;
-
- // the "server" should have received a registration packet for the new name
- let req = smol::block_on(smolboi::util::read_packet(&mut server_sock.rx))?;
- if let Packet::Register { name } = req {
- assert_eq!(name, new_name);
- }
-
- Ok(())
- }
-
+ use super::*;
+ use async_channel::unbounded;
+ use core::pin::Pin;
+ use futures_util::io::{self, AsyncRead, AsyncWrite, IoSlice, IoSliceMut};
+ use futures_util::task::{Context, Poll};
+ use sluice::pipe::{pipe, PipeReader, PipeWriter};
+
+ struct FakeTcpStream {
+ rx: PipeReader,
+ tx: PipeWriter,
+ }
+
+ impl FakeTcpStream {
+ pub fn new() -> (Self, Self) {
+ let (rx_a, tx_a) = pipe();
+ let (rx_b, tx_b) = pipe();
+
+ // create a "crossover" cable
+ let socket_a = Self { rx: rx_b, tx: tx_a };
+ let socket_b = Self { rx: rx_a, tx: tx_b };
+
+ (socket_a, socket_b)
+ }
+ }
+
+ impl AsyncRead for FakeTcpStream {
+ fn poll_read(
+ mut self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ buf: &mut [u8],
+ ) -> Poll<io::Result<usize>> {
+ PipeReader::poll_read(Pin::new(&mut self.rx), cx, buf)
+ }
+
+ fn poll_read_vectored(
+ mut self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ bufs: &mut [IoSliceMut<'_>],
+ ) -> Poll<io::Result<usize>> {
+ PipeReader::poll_read_vectored(Pin::new(&mut self.rx), cx, bufs)
+ }
+ }
+
+ impl AsyncWrite for FakeTcpStream {
+ fn poll_write(
+ mut self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ buf: &[u8],
+ ) -> Poll<io::Result<usize>> {
+ PipeWriter::poll_write(Pin::new(&mut self.tx), cx, buf)
+ }
+
+ fn poll_write_vectored(
+ mut self: Pin<&mut Self>,
+ cx: &mut Context<'_>,
+ bufs: &[IoSlice<'_>],
+ ) -> Poll<io::Result<usize>> {
+ PipeWriter::poll_write_vectored(Pin::new(&mut self.tx), cx, bufs)
+ }
+
+ fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
+ PipeWriter::poll_flush(Pin::new(&mut self.tx), cx)
+ }
+
+ fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
+ PipeWriter::poll_close(Pin::new(&mut self.tx), cx)
+ }
+ }
+
+ const CLIENT_NAME: &'static str = "Chuck Testa";
+ const CLIENT_ADDR: &'static str = "127.0.0.1:1234";
+
+ fn init_fake_connection() -> Connection {
+ Connection {
+ id: 0,
+ name: CLIENT_NAME.to_string(),
+ addr: CLIENT_ADDR.parse().unwrap(),
+ is_connected: false,
+ }
+ }
+
+ #[test]
+ fn client_accepted_registration() -> anyhow::Result<()> {
+ // initial setup
+ let (client_sock, _server_sock) = FakeTcpStream::new();
+ let (mx_tx, _mx_rx) = unbounded();
+
+ // an initialized client should not be considered connected ...
+ let mut client = Client::with(init_fake_connection(), client_sock, mx_tx);
+ assert_eq!(client.state, ClientState::Connecting);
+
+ // an accepted registration should result in a state change
+ let resp = Packet::RegistrationAccepted {
+ name: "Chuck Testa".to_string(),
+ };
+
+ smol::block_on(client.handle_net_pkt(resp))?;
+ assert_eq!(client.state, ClientState::Connected);
+
+ Ok(())
+ }
+
+ #[test]
+ fn client_rejected_registration() -> anyhow::Result<()> {
+ // initial setup
+ let (client_sock, mut server_sock) = FakeTcpStream::new();
+ let (mx_tx, _mx_rx) = unbounded();
+ let mut client = Client::with(init_fake_connection(), client_sock, mx_tx);
+
+ // rejected client should still be in Connecting state
+ smol::block_on(client.handle_net_pkt(Packet::RegistrationRejected))?;
+ assert_eq!(client.state, ClientState::Connecting);
+
+ // tell client to change nick
+ let new_name = "newer_more_different_name";
+ smol::block_on(client.handle_shell_msg(ConnMsg::ChangeNick {
+ name: new_name.to_string(),
+ }))?;
+
+ // the "server" should have received a registration packet for the new name
+ let req = smol::block_on(smolboi::util::read_packet(&mut server_sock.rx))?;
+ if let Packet::Register { name } = req {
+ assert_eq!(name, new_name);
+ }
+
+ Ok(())
+ }
}
M linetest/src/shell/ui/buffer.rs => linetest/src/shell/ui/buffer.rs +339 -323
@@ 1,350 1,366 @@
+use super::{COLOR_TITLE_BG, COLOR_TITLE_FG};
use crossterm::{
- cursor,
- style::{self, Print},
- terminal::{Clear, ClearType},
- QueueableCommand,
+ cursor,
+ style::{self, Print},
+ terminal::{Clear, ClearType},
+ QueueableCommand,
};
use std::cmp;
use std::io;
-use super::{COLOR_TITLE_BG, COLOR_TITLE_FG};
use textwrap;
struct Span {
- line_idx: usize,
- lines: Vec<String>,
+ line_idx: usize,
+ lines: Vec<String>,
}
impl Span {
- pub fn new(idx: usize, buf: &str, cols: u16) -> Self {
- let wrapped_str = textwrap::fill(buf, cols.into());
-
- let lines = wrapped_str
- .lines()
- .map(|line| { line.to_owned() })
- .collect::<Vec<_>>();
-
- Self { line_idx: idx, lines: lines }
- }
-
- pub fn line_iter(&self) -> impl DoubleEndedIterator<Item = &String> {
- self.lines.iter()
- }
+ pub fn new(idx: usize, buf: &str, cols: u16) -> Self {
+ let wrapped_str = textwrap::fill(buf, cols.into());
+
+ let lines = wrapped_str
+ .lines()
+ .map(|line| line.to_owned())
+ .collect::<Vec<_>>();
+
+ Self {
+ line_idx: idx,
+ lines: lines,
+ }
+ }
+
+ pub fn line_iter(&self) -> impl DoubleEndedIterator<Item = &String> {
+ self.lines.iter()
+ }
}
-/// The `Buffer` stores a list of lines which need to be displayed
+/// The `Buffer` stores a list of lines which need to be displayed
/// in a fixed-width context. Lines can be added to the buffer and
/// then displayed at any arbitrary resolution later.
pub struct Buffer {
- cols: u16,
- rows: u16,
+ cols: u16,
+ rows: u16,
- lines: Vec<String>,
- current_line_ptr: usize,
- max_line_ptr: usize,
+ lines: Vec<String>,
+ current_line_ptr: usize,
+ max_line_ptr: usize,
- needs_repaint: bool,
+ needs_repaint: bool,
}
impl Buffer {
- pub fn new(cols: u16, rows: u16) -> Self {
- Self {
- cols: cols,
- rows: rows,
-
- lines: vec![],
- current_line_ptr: 0,
- max_line_ptr: 0,
-
- needs_repaint: true,
- }
- }
-
- pub fn resise(&mut self, cols: u16, rows: u16) {
- self.needs_repaint = true;
- self.cols = cols;
- self.rows = rows;
-
- // If there are more lines that could be shown we push the current line pointer
- // down in an effort to fill the window w/ the add'l lines.
- let delta_buffer = self.lines.len() - self.current_line_ptr;
-
- let rows: usize = self.rows.into();
- if rows > delta_buffer {
- self.current_line_ptr -= cmp::min(self.current_line_ptr, rows - delta_buffer);
- }
- }
-
- pub fn append_line(&mut self, line: String) {
- self.lines.push(line);
- self.needs_repaint = true;
- }
-
- pub fn scroll_up(&mut self) {
- let new_line_ptr = self.current_line_ptr + 1;
-
- if new_line_ptr <= self.lines.len() && self.max_line_ptr > 0 {
- self.current_line_ptr += 1;
- self.needs_repaint = true;
- }
- }
-
- pub fn scroll_down(&mut self) {
- if self.current_line_ptr > 0 {
- self.current_line_ptr -= 1;
- self.needs_repaint = true;
- }
- }
-
- /// The caller *must* set the cursor to the row where this buffer should be rendered.
- /// The buffer will ensure that it paints _at most_ `self.rows` lines onto the terminal.
- /// NOTE: this is a no-op if the buffer does not need to be repainted.
- pub fn render(&mut self, stdout: &mut std::io::Stdout) -> anyhow::Result<()> {
- if !self.needs_repaint { return Ok(()); }
- self.needs_repaint = false;
-
- // move to bottom-most row.
- stdout
- .queue(cursor::MoveDown(self.rows))?
- .queue(cursor::MoveToColumn(0))?;
-
- // resize the spans starting from current line
- let cols = self.cols; // borrowck strikes again wtf, can't move this into `.map(...)`??? IT'S COPY.
- let viewport_spans = self.lines
- .iter()
- .enumerate()
- .rev()
- .skip(self.current_line_ptr)
- .map(|(idx, line)| Span::new(idx, line, cols))
- .take(self.rows.into());
-
- // blit lines to the viewport
- let mut rows_drawn = 0;
-
- for span in viewport_spans {
- let mut lines = span.line_iter().rev();
-
- while rows_drawn <= self.rows.into() {
- match lines.next() {
- Some(line) => {
- rows_drawn += 1;
- self.max_line_ptr = span.line_idx;
- stdout
- .queue(Clear(ClearType::CurrentLine))?
- .queue(Print(line))?
- .queue(cursor::MoveUp(1))?
- .queue(cursor::MoveToColumn(0))?;
- },
-
- None => { break },
- }
- }
- }
-
- // clear remaining lines
- while rows_drawn < self.rows.into() {
- rows_drawn += 1;
- stdout
- .queue(Clear(ClearType::CurrentLine))?
- .queue(cursor::MoveUp(1))?
- .queue(cursor::MoveToColumn(0))?;
- }
-
- let buf_info = format!(
- "cur: {}, max: {}, len: {}, rows: {}",
- self.current_line_ptr,
- self.max_line_ptr,
- self.lines.len(),
- self.rows,
- );
-
- stdout
- .queue(cursor::MoveTo(0, 1))?
- .queue(style::SetBackgroundColor(COLOR_TITLE_BG))?
- .queue(style::SetForegroundColor(COLOR_TITLE_FG))?
- .queue(Clear(ClearType::CurrentLine))?
- .queue(Print(&buf_info))?
- .queue(style::ResetColor)?;
-
- Ok(())
- }
+ pub fn new(cols: u16, rows: u16) -> Self {
+ Self {
+ cols: cols,
+ rows: rows,
+
+ lines: vec![],
+ current_line_ptr: 0,
+ max_line_ptr: 0,
+
+ needs_repaint: true,
+ }
+ }
+
+ pub fn resise(&mut self, cols: u16, rows: u16) {
+ self.needs_repaint = true;
+ self.cols = cols;
+ self.rows = rows;
+
+ // If there are more lines that could be shown we push the current line pointer
+ // down in an effort to fill the window w/ the add'l lines.
+ let delta_buffer = self.lines.len() - self.current_line_ptr;
+
+ let rows: usize = self.rows.into();
+ if rows > delta_buffer {
+ self.current_line_ptr -= cmp::min(self.current_line_ptr, rows - delta_buffer);
+ }
+ }
+
+ pub fn append_line(&mut self, line: String) {
+ self.lines.push(line);
+ self.needs_repaint = true;
+ }
+
+ pub fn scroll_up(&mut self) {
+ let new_line_ptr = self.current_line_ptr + 1;
+
+ if new_line_ptr <= self.lines.len() && self.max_line_ptr > 0 {
+ self.current_line_ptr += 1;
+ self.needs_repaint = true;
+ }
+ }
+
+ pub fn scroll_down(&mut self) {
+ if self.current_line_ptr > 0 {
+ self.current_line_ptr -= 1;
+ self.needs_repaint = true;
+ }
+ }
+
+ /// The caller *must* set the cursor to the row where this buffer should be rendered.
+ /// The buffer will ensure that it paints _at most_ `self.rows` lines onto the terminal.
+ /// NOTE: this is a no-op if the buffer does not need to be repainted.
+ pub fn render(&mut self, stdout: &mut std::io::Stdout) -> anyhow::Result<()> {
+ if !self.needs_repaint {
+ return Ok(());
+ }
+ self.needs_repaint = false;
+
+ // move to bottom-most row.
+ stdout
+ .queue(cursor::MoveDown(self.rows))?
+ .queue(cursor::MoveToColumn(0))?;
+
+ // resize the spans starting from current line
+ let cols = self.cols; // borrowck strikes again wtf, can't move this into `.map(...)`??? IT'S COPY.
+ let viewport_spans = self
+ .lines
+ .iter()
+ .enumerate()
+ .rev()
+ .skip(self.current_line_ptr)
+ .map(|(idx, line)| Span::new(idx, line, cols))
+ .take(self.rows.into());
+
+ // blit lines to the viewport
+ let mut rows_drawn = 0;
+
+ for span in viewport_spans {
+ let mut lines = span.line_iter().rev();
+
+ while rows_drawn <= self.rows.into() {
+ match lines.next() {
+ Some(line) => {
+ rows_drawn += 1;
+ self.max_line_ptr = span.line_idx;
+ stdout
+ .queue(Clear(ClearType::CurrentLine))?
+ .queue(Print(line))?
+ .queue(cursor::MoveUp(1))?
+ .queue(cursor::MoveToColumn(0))?;
+ }
+
+ None => break,
+ }
+ }
+ }
+
+ // clear remaining lines
+ while rows_drawn < self.rows.into() {
+ rows_drawn += 1;
+ stdout
+ .queue(Clear(ClearType::CurrentLine))?
+ .queue(cursor::MoveUp(1))?
+ .queue(cursor::MoveToColumn(0))?;
+ }
+
+ let buf_info = format!(
+ "cur: {}, max: {}, len: {}, rows: {}",
+ self.current_line_ptr,
+ self.max_line_ptr,
+ self.lines.len(),
+ self.rows,
+ );
+
+ stdout
+ .queue(cursor::MoveTo(0, 1))?
+ .queue(style::SetBackgroundColor(COLOR_TITLE_BG))?
+ .queue(style::SetForegroundColor(COLOR_TITLE_FG))?
+ .queue(Clear(ClearType::CurrentLine))?
+ .queue(Print(&buf_info))?
+ .queue(style::ResetColor)?;
+
+ Ok(())
+ }
}
pub struct InputBuffer {
- pos: usize,
- start: usize,
- end: usize,
+ pos: usize,
+ start: usize,
+ end: usize,
- buffer: String,
+ buffer: String,
}
impl InputBuffer {
- pub fn new() -> Self {
- Self {
- // TODO: should probably init w/ some size ...
- pos: 0,
- start: 0,
- end: 0,
-
- buffer: String::new(),
- }
- }
-
- pub fn insert(&mut self, ch: char) {
- if self.pos < self.buffer.len() {
- self.buffer.insert(self.pos, ch);
- } else {
- self.buffer.push(ch);
- }
-
- self.cursor_right();
- }
-
- pub fn backspace(&mut self) {
- if self.pos < self.buffer.len() {
- self.buffer.remove(self.pos - 1);
- } else {
- self.buffer.pop();
- }
-
- self.cursor_left();
- }
-
- pub fn delete(&mut self) {
- if self.pos < self.buffer.len() {
- self.buffer.remove(self.pos);
- }
- }
-
- pub fn cursor_left(&mut self) {
- if self.pos <= 0 { return; }
-
- self.pos -= 1;
-
- if self.pos < self.start {
- self.start -= 1;
- self.end -= 1;
- }
- }
-
- pub fn cursor_right(&mut self) {
- if self.pos >= self.buffer.len() { return; }
-
- self.pos += 1;
-
- if self.pos > self.end {
- self.start += 1;
- self.end += 1;
- }
- }
-
- pub fn goto_head(&mut self) {
- self.pos = 0;
- self.start = 0;
- self.end = self.span_len();
- }
-
- pub fn goto_tail(&mut self) {
- self.pos = self.buffer.len();
- self.end = self.pos;
- self.start = self.pos - self.span_len();
- }
-
- pub fn take(&mut self) -> String {
- let mut old_buffer = String::new();
- std::mem::swap(&mut self.buffer, &mut old_buffer);
-
- self.pos = 0;
- self.start = 0;
- self.end = 0;
-
- return old_buffer;
- }
-
- pub fn render(&mut self, stdout: &mut io::Stdout, col: u16, row: u16, width: u16) -> anyhow::Result<()> {
- // calculate some info about our prompt & buffer size
- let prompt = "> ";
- let width: usize = width.into();
- let width_with_prompt = width - 1 - prompt.len();
-
- // adjust viewspan to match current terminal width (less the prompt)
- if self.span_len() > width_with_prompt {
- // shrink the view span
- let delta = self.span_len() - width_with_prompt;
-
- let trailing_width = self.buffer.len() - self.pos;
-
- if self.pos + trailing_width > width_with_prompt {
- let start_to_pos = self.pos - self.start;
- self.start = self.pos;
-
- if start_to_pos < delta {
- self.end -= delta - start_to_pos;
- }
- } else {
- self.end -= delta;
- }
- } else if self.span_len() < width_with_prompt {
- // grow the view span
- let mut delta = width_with_prompt - self.span_len();
-
- // consume delta to display tail of the buffer outside the viewport.
- if self.buffer.len() > self.end {
- let overhang = cmp::min(delta, self.buffer.len() - self.end);
- self.end += overhang;
- delta -= overhang;
- }
-
- // repeat the same for the head of the buffer outside the viewport.
- if self.start > 0 {
- let overhang = cmp::min(delta, self.start);
- self.start -= overhang;
- delta -= overhang;
- }
-
- // if we still have new columns place them on the end.
- //
- // this should only be the case if the input span is larger than
- // the current buffer.
- self.end += delta;
- }
-
- // draw status info on line 2
- // let buf_info = format!(
- // "start: {}, end: {}, pos: {}, spanlen: {}, buflen: {}",
- // self.start,
- // self.end,
- // self.pos,
- // self.span_len(),
- // self.buffer.len()
- // );
-
- // stdout
- // .queue(cursor::MoveTo(0, 1))?
- // .queue(style::SetBackgroundColor(COLOR_TITLE_BG))?
- // .queue(style::SetForegroundColor(COLOR_TITLE_FG))?
- // .queue(Clear(ClearType::CurrentLine))?
- // .queue(Print(&buf_info))?
- // .queue(style::ResetColor)?;
-
- // clear the line & draw our prompt
- stdout
- .queue(cursor::MoveTo(col, row))?
- .queue(Clear(ClearType::CurrentLine))?
- .queue(Print(prompt.to_string()))?;
-
- // render the current viewport & update the cursor
- let beg = self.start;
- let end = cmp::min(self.buffer.len(), self.end);
- let relative_cursor_pos = (self.pos - self.start) + prompt.len() + 1;
- assert!(relative_cursor_pos <= u16::MAX.into());
-
- stdout
- .queue(Print(&self.buffer[beg..end]))?
- .queue(cursor::MoveToColumn(relative_cursor_pos as u16))?;
-
- Ok(())
- }
-
- fn span_len(&self) -> usize {
- self.end - self.start
- }
+ pub fn new() -> Self {
+ Self {
+ // TODO: should probably init w/ some size ...
+ pos: 0,
+ start: 0,
+ end: 0,
+
+ buffer: String::new(),
+ }
+ }
+
+ pub fn insert(&mut self, ch: char) {
+ if self.pos < self.buffer.len() {
+ self.buffer.insert(self.pos, ch);
+ } else {
+ self.buffer.push(ch);
+ }
+
+ self.cursor_right();
+ }
+
+ pub fn backspace(&mut self) {
+ if self.pos < self.buffer.len() {
+ self.buffer.remove(self.pos - 1);
+ } else {
+ self.buffer.pop();
+ }
+
+ self.cursor_left();
+ }
+
+ pub fn delete(&mut self) {
+ if self.pos < self.buffer.len() {
+ self.buffer.remove(self.pos);
+ }
+ }
+
+ pub fn cursor_left(&mut self) {
+ if self.pos <= 0 {
+ return;
+ }
+
+ self.pos -= 1;
+
+ if self.pos < self.start {
+ self.start -= 1;
+ self.end -= 1;
+ }
+ }
+
+ pub fn cursor_right(&mut self) {
+ if self.pos >= self.buffer.len() {
+ return;
+ }
+
+ self.pos += 1;
+
+ if self.pos > self.end {
+ self.start += 1;
+ self.end += 1;
+ }
+ }
+
+ pub fn goto_head(&mut self) {
+ self.pos = 0;
+ self.start = 0;
+ self.end = self.span_len();
+ }
+
+ pub fn goto_tail(&mut self) {
+ self.pos = self.buffer.len();
+ self.end = self.pos;
+ self.start = self.pos - self.span_len();
+ }
+
+ pub fn take(&mut self) -> String {
+ let mut old_buffer = String::new();
+ std::mem::swap(&mut self.buffer, &mut old_buffer);
+
+ self.pos = 0;
+ self.start = 0;
+ self.end = 0;
+
+ return old_buffer;
+ }
+
+ pub fn render(
+ &mut self,
+ stdout: &mut io::Stdout,
+ col: u16,
+ row: u16,
+ width: u16,
+ ) -> anyhow::Result<()> {
+ // calculate some info about our prompt & buffer size
+ let prompt = "> ";
+ let width: usize = width.into();
+ let width_with_prompt = width - 1 - prompt.len();
+
+ // adjust viewspan to match current terminal width (less the prompt)
+ if self.span_len() > width_with_prompt {
+ // shrink the view span
+ let delta = self.span_len() - width_with_prompt;
+
+ let trailing_width = self.buffer.len() - self.pos;
+
+ if self.pos + trailing_width > width_with_prompt {
+ let start_to_pos = self.pos - self.start;
+ self.start = self.pos;
+
+ if start_to_pos < delta {
+ self.end -= delta - start_to_pos;
+ }
+ } else {
+ self.end -= delta;
+ }
+ } else if self.span_len() < width_with_prompt {
+ // grow the view span
+ let mut delta = width_with_prompt - self.span_len();
+
+ // consume delta to display tail of the buffer outside the viewport.
+ if self.buffer.len() > self.end {
+ let overhang = cmp::min(delta, self.buffer.len() - self.end);
+ self.end += overhang;
+ delta -= overhang;
+ }
+
+ // repeat the same for the head of the buffer outside the viewport.
+ if self.start > 0 {
+ let overhang = cmp::min(delta, self.start);
+ self.start -= overhang;
+ delta -= overhang;
+ }
+
+ // if we still have new columns place them on the end.
+ //
+ // this should only be the case if the input span is larger than
+ // the current buffer.
+ self.end += delta;
+ }
+
+ // draw status info on line 2
+ // let buf_info = format!(
+ // "start: {}, end: {}, pos: {}, spanlen: {}, buflen: {}",
+ // self.start,
+ // self.end,
+ // self.pos,
+ // self.span_len(),
+ // self.buffer.len()
+ // );
+
+ // stdout
+ // .queue(cursor::MoveTo(0, 1))?
+ // .queue(style::SetBackgroundColor(COLOR_TITLE_BG))?
+ // .queue(style::SetForegroundColor(COLOR_TITLE_FG))?
+ // .queue(Clear(ClearType::CurrentLine))?
+ // .queue(Print(&buf_info))?
+ // .queue(style::ResetColor)?;
+
+ // clear the line & draw our prompt
+ stdout
+ .queue(cursor::MoveTo(col, row))?
+ .queue(Clear(ClearType::CurrentLine))?
+ .queue(Print(prompt.to_string()))?;
+
+ // render the current viewport & update the cursor
+ let beg = self.start;
+ let end = cmp::min(self.buffer.len(), self.end);
+ let relative_cursor_pos = (self.pos - self.start) + prompt.len() + 1;
+ assert!(relative_cursor_pos <= u16::MAX.into());
+
+ stdout
+ .queue(Print(&self.buffer[beg..end]))?
+ .queue(cursor::MoveToColumn(relative_cursor_pos as u16))?;
+
+ Ok(())
+ }
+
+ fn span_len(&self) -> usize {
+ self.end - self.start
+ }
}
M linetest/src/shell/ui/mod.rs => linetest/src/shell/ui/mod.rs +45 -45
@@ 1,9 1,9 @@
use crossterm::{
- cursor,
- event::{self, Event},
- style,
- terminal::{EnterAlternateScreen, LeaveAlternateScreen},
- ExecutableCommand,
+ cursor,
+ event::{self, Event},
+ style,
+ terminal::{EnterAlternateScreen, LeaveAlternateScreen},
+ ExecutableCommand,
};
use std::io;
@@ 16,44 16,44 @@ pub const COLOR_TITLE_BG: style::Color = style::Color::Blue;
pub const COLOR_TITLE_FG: style::Color = style::Color::Grey;
pub fn start_task() -> anyhow::Result<()> {
- let mut shell = term::TermInterface::new()?;
- let poll_hz = Duration::from_millis(1000 / 120);
-
- // take over user's terminal
- crossterm::terminal::enable_raw_mode()?;
-
- io::stdout()
- .execute(EnterAlternateScreen)?
- .execute(cursor::DisableBlinking)?
- .execute(cursor::Hide)?;
-
- // loop over crossterm events every 120Hz
- while shell.is_running() {
- shell.read_connections()?;
- shell.present()?;
- if !event::poll(poll_hz)? { continue }
-
- match event::read()? {
- Event::Key(key_evt) => {
- shell.event_keydown(key_evt);
- },
-
- Event::Resize(cols, rows) => {
- shell.event_resize(cols, rows);
- },
-
- _ => {},
- }
-
-
- }
-
- // return to user's previous terminal
- io::stdout()
- .execute(LeaveAlternateScreen)?
- .execute(cursor::Show)?;
-
- crossterm::terminal::disable_raw_mode()?;
-
- Ok(())
+ let mut shell = term::TermInterface::new()?;
+ let poll_hz = Duration::from_millis(1000 / 120);
+
+ // take over user's terminal
+ crossterm::terminal::enable_raw_mode()?;
+
+ io::stdout()
+ .execute(EnterAlternateScreen)?
+ .execute(cursor::DisableBlinking)?
+ .execute(cursor::Hide)?;
+
+ // loop over crossterm events every 120Hz
+ while shell.is_running() {
+ shell.read_connections()?;
+ shell.present()?;
+ if !event::poll(poll_hz)? {
+ continue;
+ }
+
+ match event::read()? {
+ Event::Key(key_evt) => {
+ shell.event_keydown(key_evt);
+ }
+
+ Event::Resize(cols, rows) => {
+ shell.event_resize(cols, rows);
+ }
+
+ _ => {}
+ }
+ }
+
+ // return to user's previous terminal
+ io::stdout()
+ .execute(LeaveAlternateScreen)?
+ .execute(cursor::Show)?;
+
+ crossterm::terminal::disable_raw_mode()?;
+
+ Ok(())
}
M linetest/src/shell/ui/term.rs => linetest/src/shell/ui/term.rs +335 -306
@@ 1,330 1,359 @@
-use crate::shell::{net, ConnMsg, ConnectionMap, Error};
use super::buffer::{Buffer, InputBuffer};
use super::{COLOR_TITLE_BG, COLOR_TITLE_FG};
+use crate::shell::{net, ConnMsg, ConnectionMap, Error};
use std::io::{self, Write};
use crossterm::{
- cursor,
- event::{self, KeyEvent, KeyModifiers},
- style::{self, Print},
- terminal::{self, Clear, ClearType},
- QueueableCommand,
+ cursor,
+ event::{self, KeyEvent, KeyModifiers},
+ style::{self, Print},
+ terminal::{self, Clear, ClearType},
+ QueueableCommand,
};
pub struct TermInterface {
- term_cols: u16,
- term_rows: u16,
+ term_cols: u16,
+ term_rows: u16,
- buffer: InputBuffer,
- syslog: Buffer,
- is_running: bool,
+ buffer: InputBuffer,
+ syslog: Buffer,
+ is_running: bool,
- conn_map: ConnectionMap,
- current_buffer: Option<i64>,
+ conn_map: ConnectionMap,
+ current_buffer: Option<i64>,
}
impl TermInterface {
- pub fn new() -> anyhow::Result<Self> {
- let (cols, rows) = terminal::size()?;
-
- Ok(Self {
- term_cols: cols,
- term_rows: rows,
-
- buffer: InputBuffer::new(),
- syslog: Buffer::new(cols, rows - 3),
- is_running: true,
-
- conn_map: ConnectionMap::new(),
- current_buffer: None,
- })
- }
-
- pub fn read_connections(&mut self) -> anyhow::Result<()> {
- let conn_messages = self.conn_map.poll_messages();
-
- for (conn_id, conn_msg_q) in conn_messages {
- for conn_msg in conn_msg_q {
- match conn_msg {
- ConnMsg::LogLine { msg } => {
- let log_line = format!("conn({}): {}", conn_id, msg);
- self.syslog.append_line(log_line);
- },
-
- ConnMsg::BufLine { id, msg } => {
- // TODO: buffer management
- let log_line = format!("{}@{}: {}", conn_id, id, msg);
- self.syslog.append_line(log_line);
- },
-
- ConnMsg::RejectTemp { reason } => {
- let log_line = format!("conn({}): {}", conn_id, reason);
- self.syslog.append_line(log_line);
- },
-
- _ => {},
- }
- }
-
- }
-
- Ok(())
- }
-
- pub fn present(&mut self) -> anyhow::Result<()> {
- let mut stdout = io::stdout();
- stdout.queue(cursor::Hide)?;
-
- // draw titlebar
- stdout
- .queue(cursor::MoveTo(0, 0))?
- .queue(style::SetBackgroundColor(COLOR_TITLE_BG))?
- .queue(style::SetForegroundColor(COLOR_TITLE_FG))?
- .queue(Clear(ClearType::CurrentLine))?
- .queue(Print(self.status_line()))?
- .queue(style::ResetColor)?;
-
- // draw the input buffer and save its cursor position
- // TODO: render this lazily?
- self.buffer.render(&mut stdout, 0, self.term_rows, self.term_cols)?;
- stdout.queue(cursor::SavePosition)?;
-
- // draw the syslog buffer
- // TODO: draw current buffer
- stdout.queue(cursor::MoveTo(0, 1))?; // move to row 2: where we want to paint the buffer.
- self.syslog.render(&mut stdout)?;
-
- // restore input cursor position
- stdout
- .queue(cursor::RestorePosition)?
- .queue(cursor::Show)?;
-
- Ok(stdout.flush()?)
- }
-
- pub fn event_keydown(&mut self, event: event::KeyEvent) {
- match event {
- KeyEvent { code, modifiers } if modifiers.contains(KeyModifiers::CONTROL) => match code {
- event::KeyCode::Char('q') => { self.is_running = false; },
- _ => {},
- },
-
- KeyEvent { code, modifiers } => match code {
- event::KeyCode::Char(ch) if modifiers.contains(KeyModifiers::SHIFT) => {
-
- self.buffer.insert(ch.to_ascii_uppercase());
- },
-
- event::KeyCode::Char(ch) => {
- self.buffer.insert(ch);
- },
-
- event::KeyCode::Backspace => {
- self.buffer.backspace();
- },
-
- event::KeyCode::Delete => {
- self.buffer.delete();
- },
-
- event::KeyCode::Left => {
- self.buffer.cursor_left();
- },
-
- event::KeyCode::Right => {
- self.buffer.cursor_right();
- },
-
- event::KeyCode::Enter => {
- self.process_buffer();
- },
-
- event::KeyCode::PageUp => {
- self.syslog.scroll_up();
- },
-
- event::KeyCode::PageDown => {
- self.syslog.scroll_down();
- },
-
- event::KeyCode::Home => {
- self.buffer.goto_head();
- },
-
- event::KeyCode::End => {
- self.buffer.goto_tail();
- },
-
- _ => {},
- },
- }
- }
-
- pub fn event_resize(&mut self, cols: u16, rows: u16) {
- self.term_cols = cols;
- self.term_rows = rows;
-
- // syslog buffer is max terminal height - 2 rows for status bar - 1 row for input buffer
- self.syslog.resise(self.term_cols, self.term_rows - 3);
- }
-
- pub fn is_running(&self) -> bool { self.is_running }
-
- fn status_line(&self) -> String {
- format!("linetest - rows [{}] cols [{}]", self.term_rows, self.term_cols)
- }
-
- fn process_buffer(&mut self) {
- let result = match self.buffer.take() {
- cmd if cmd.starts_with("/connect") => self.cmd_connect(cmd),
- cmd if cmd.starts_with("/sessions") => self.cmd_list_sessions(cmd),
- cmd if cmd.starts_with("/nick") => self.cmd_nick(cmd),
- cmd if cmd.starts_with("/join") => self.cmd_join(cmd),
- cmd if cmd.starts_with("/part") => self.cmd_part(cmd),
-
-
- cmd if cmd.starts_with("/test") => self.cmd_test(cmd),
- cmd if cmd.starts_with("/e") => self.cmd_echo(cmd),
- cmd => self.cmd_message(cmd),
- };
-
- if let Err(msg) = result {
- self.syslog.append_line(format!("command error: {:?}", msg));
- }
- }
-
- fn cmd_connect(&mut self, buf: String) -> anyhow::Result<()> {
- let mut args = buf.split(" ");
- let _cmd_name = args.next().ok_or_else(|| Error::MissingArgument)?;
- let hostname = args.next().ok_or_else(|| Error::MissingArgument)?;
- let nickname = args.next().ok_or_else(|| Error::MissingArgument)?;
-
- let (conn_id, conn_mx) = self.conn_map.create_connection(hostname, nickname)?;
- let conn = self.conn_map.connections[&conn_id].clone();
-
- smol::Task::spawn(async move {
+ pub fn new() -> anyhow::Result<Self> {
+ let (cols, rows) = terminal::size()?;
+
+ Ok(Self {
+ term_cols: cols,
+ term_rows: rows,
+
+ buffer: InputBuffer::new(),
+ syslog: Buffer::new(cols, rows - 3),
+ is_running: true,
+
+ conn_map: ConnectionMap::new(),
+ current_buffer: None,
+ })
+ }
+
+ pub fn read_connections(&mut self) -> anyhow::Result<()> {
+ let conn_messages = self.conn_map.poll_messages();
+
+ for (conn_id, conn_msg_q) in conn_messages {
+ for conn_msg in conn_msg_q {
+ match conn_msg {
+ ConnMsg::LogLine { msg } => {
+ let log_line = format!("conn({}): {}", conn_id, msg);
+ self.syslog.append_line(log_line);
+ }
+
+ ConnMsg::BufLine { id, msg } => {
+ // TODO: buffer management
+ let log_line = format!("{}@{}: {}", conn_id, id, msg);
+ self.syslog.append_line(log_line);
+ }
+
+ ConnMsg::RejectTemp { reason } => {
+ let log_line = format!("conn({}): {}", conn_id, reason);
+ self.syslog.append_line(log_line);
+ }
+
+ _ => {}
+ }
+ }
+ }
+
+ Ok(())
+ }
+
+ pub fn present(&mut self) -> anyhow::Result<()> {
+ let mut stdout = io::stdout();
+ stdout.queue(cursor::Hide)?;
+
+ // draw titlebar
+ stdout
+ .queue(cursor::MoveTo(0, 0))?
+ .queue(style::SetBackgroundColor(COLOR_TITLE_BG))?
+ .queue(style::SetForegroundColor(COLOR_TITLE_FG))?
+ .queue(Clear(ClearType::CurrentLine))?
+ .queue(Print(self.status_line()))?
+ .queue(style::ResetColor)?;
+
+ // draw the input buffer and save its cursor position
+ // TODO: render this lazily?
+ self.buffer
+ .render(&mut stdout, 0, self.term_rows, self.term_cols)?;
+ stdout.queue(cursor::SavePosition)?;
+
+ // draw the syslog buffer
+ // TODO: draw current buffer
+ stdout.queue(cursor::MoveTo(0, 1))?; // move to row 2: where we want to paint the buffer.
+ self.syslog.render(&mut stdout)?;
+
+ // restore input cursor position
+ stdout.queue(cursor::RestorePosition)?.queue(cursor::Show)?;
+
+ Ok(stdout.flush()?)
+ }
+
+ pub fn event_keydown(&mut self, event: event::KeyEvent) {
+ match event {
+ KeyEvent { code, modifiers } if modifiers.contains(KeyModifiers::CONTROL) => match code
+ {
+ event::KeyCode::Char('q') => {
+ self.is_running = false;
+ }
+ _ => {}
+ },
+
+ KeyEvent { code, modifiers } => match code {
+ event::KeyCode::Char(ch) if modifiers.contains(KeyModifiers::SHIFT) => {
+ self.buffer.insert(ch.to_ascii_uppercase());
+ }
+
+ event::KeyCode::Char(ch) => {
+ self.buffer.insert(ch);
+ }
+
+ event::KeyCode::Backspace => {
+ self.buffer.backspace();
+ }
+
+ event::KeyCode::Delete => {
+ self.buffer.delete();
+ }
+
+ event::KeyCode::Left => {
+ self.buffer.cursor_left();
+ }
+
+ event::KeyCode::Right => {
+ self.buffer.cursor_right();
+ }
+
+ event::KeyCode::Enter => {
+ self.process_buffer();
+ }
+
+ event::KeyCode::PageUp => {
+ self.syslog.scroll_up();
+ }
+
+ event::KeyCode::PageDown => {
+ self.syslog.scroll_down();
+ }
+
+ event::KeyCode::Home => {
+ self.buffer.goto_head();
+ }
+
+ event::KeyCode::End => {
+ self.buffer.goto_tail();
+ }
+
+ _ => {}
+ },
+ }
+ }
+
+ pub fn event_resize(&mut self, cols: u16, rows: u16) {
+ self.term_cols = cols;
+ self.term_rows = rows;
+
+ // syslog buffer is max terminal height - 2 rows for status bar - 1 row for input buffer
+ self.syslog.resise(self.term_cols, self.term_rows - 3);
+ }
+
+ pub fn is_running(&self) -> bool {
+ self.is_running
+ }
+
+ fn status_line(&self) -> String {
+ format!(
+ "linetest - rows [{}] cols [{}]",
+ self.term_rows, self.term_cols
+ )
+ }
+
+ fn process_buffer(&mut self) {
+ let result = match self.buffer.take() {
+ cmd if cmd.starts_with("/connect") => self.cmd_connect(cmd),
+ cmd if cmd.starts_with("/sessions") => self.cmd_list_sessions(cmd),
+ cmd if cmd.starts_with("/nick") => self.cmd_nick(cmd),
+ cmd if cmd.starts_with("/join") => self.cmd_join(cmd),
+ cmd if cmd.starts_with("/part") => self.cmd_part(cmd),
+
+ cmd if cmd.starts_with("/test") => self.cmd_test(cmd),
+ cmd if cmd.starts_with("/e") => self.cmd_echo(cmd),
+ cmd => self.cmd_message(cmd),
+ };
+
+ if let Err(msg) = result {
+ self.syslog.append_line(format!("command error: {:?}", msg));
+ }
+ }
+
+ fn cmd_connect(&mut self, buf: String) -> anyhow::Result<()> {
+ let mut args = buf.split(" ");
+ let _cmd_name = args.next().ok_or_else(|| Error::MissingArgument)?;
+ let hostname = args.next().ok_or_else(|| Error::MissingArgument)?;
+ let nickname = args.next().ok_or_else(|| Error::MissingArgument)?;
+
+ let (conn_id, conn_mx) = self.conn_map.create_connection(hostname, nickname)?;
+ let conn = self.conn_map.connections[&conn_id].clone();
+
+ smol::Task::spawn(async move {
let net_outbox = conn_mx.outbox.clone();
if let Err(msg) = net::start_task(conn, conn_mx).await {
let log_line = ConnMsg::LogLine { msg: format!("error connecting: {:?})", msg) };
-
+
if let Err(_) = net_outbox.send(log_line).await {
panic!("BUG: double fault - connection mailbox is unavailable while reporting connection error.");
}
}
}).detach();
-
- Ok(())
-
- }
-
- fn cmd_echo(&mut self, buf: String) -> anyhow::Result<()> {
- let args = buf.splitn(2, " ");
-
- if let Some(text) = args.skip(1).next() {
- self.syslog.append_line(text.to_string());
- }
-
- Ok(())
- }
-
- fn cmd_test(&mut self, _buf: String) -> anyhow::Result<()> {
- // spam a bunch of shit to the terminal so we have a buffer to test with
- for i in 1..150 {
- self.syslog.append_line(format!("this is some rather long string of text to test with, the {}th such string", i))
- }
- Ok(())
- }
-
- fn cmd_list_sessions(&mut self, _buf: String) -> anyhow::Result<()> {
- for (_id, conn) in &self.conn_map.connections {
- self.syslog.append_line(format!("server {} @ {}: connected? {}", conn.id, conn.addr, conn.is_connected));
- }
-
- Ok(())
- }
-
- fn cmd_join(&mut self, buf: String) -> anyhow::Result<()> {
- let mut args = buf.split(" ");
- let _cmd_name = args.next().ok_or_else(|| Error::MissingArgument)?;
-
- let session_no = args.next()
- .ok_or_else(|| Error::MissingArgument)
- .and_then(|arg|
- // ick ...
- arg.parse::<i64>().map_err(|err| { Error::ParseError { pos: "session_no", source: err } })
- )?;
-
- let room_name = args.next().ok_or_else(|| Error::MissingArgument)?;
-
- let conn = self.conn_map.socket_tx.get(&session_no)
- .ok_or_else(|| Error::MissingConnectionId)?;
- smol::block_on(conn.send(ConnMsg::JoinRoom {
- room: room_name.to_string(),
- }))?;
-
- Ok(())
- }
-
- fn cmd_part(&mut self, buf: String) -> anyhow::Result<()> {
- let mut args = buf.split(" ");
- let _cmd_name = args.next().ok_or_else(|| Error::MissingArgument)?;
-
- let session_no = args.next()
- .ok_or_else(|| Error::MissingArgument)
- .and_then(|arg|
- // ick ...
- arg.parse::<i64>().map_err(|err| { Error::ParseError { pos: "session_no", source: err } })
- )?;
-
- let room_name = args.next().ok_or_else(|| Error::MissingArgument)?;
-
- let conn = self.conn_map.socket_tx.get(&session_no)
- .ok_or_else(|| Error::MissingConnectionId)?;
- smol::block_on(conn.send(ConnMsg::PartRoom {
- room: room_name.to_string(),
- }))?;
-
- Ok(())
- }
-
- fn cmd_nick(&mut self, buf: String) -> anyhow::Result<()> {
- let mut args = buf.split(" ");
- let _cmd_name = args.next().ok_or_else(|| Error::MissingArgument)?;
-
- let session_no = args.next()
- .ok_or_else(|| Error::MissingArgument)
- .and_then(|arg|
- // ick ...
- arg.parse::<i64>().map_err(|err| { Error::ParseError { pos: "session_no", source: err } })
- )?;
-
- let nick = args.next().ok_or_else(|| Error::MissingArgument)?;
- let conn = self.conn_map.socket_tx.get(&session_no)
- .ok_or_else(|| Error::MissingConnectionId)?;
-
-
- smol::block_on(conn.send(ConnMsg::ChangeNick { name: nick.to_string() }))?;
- Ok(())
- }
-
- fn cmd_message(&mut self, buf: String) -> anyhow::Result<()> {
- match self.current_buffer {
- Some(_) => { /* TODO: send message to connection */ }
- None => {
- for (_id, conn) in &mut self.conn_map.socket_tx {
- smol::block_on(async {
- let _ = conn.send(ConnMsg::BufLine {
- id: "example".to_string(),
- msg: buf.clone(),
- }).await;
- });
- }
- //self.lines.push(buf)
- },
- }
-
- Ok(())
- }
+ Ok(())
+ }
+
+ fn cmd_echo(&mut self, buf: String) -> anyhow::Result<()> {
+ let args = buf.splitn(2, " ");
+
+ if let Some(text) = args.skip(1).next() {
+ self.syslog.append_line(text.to_string());
+ }
+
+ Ok(())
+ }
+
+ fn cmd_test(&mut self, _buf: String) -> anyhow::Result<()> {
+ // spam a bunch of shit to the terminal so we have a buffer to test with
+ for i in 1..150 {
+ self.syslog.append_line(format!(
+ "this is some rather long string of text to test with, the {}th such string",
+ i
+ ))
+ }
+ Ok(())
+ }
+
+ fn cmd_list_sessions(&mut self, _buf: String) -> anyhow::Result<()> {
+ for (_id, conn) in &self.conn_map.connections {
+ self.syslog.append_line(format!(
+ "server {} @ {}: connected? {}",
+ conn.id, conn.addr, conn.is_connected
+ ));
+ }
+
+ Ok(())
+ }
+
+ fn cmd_join(&mut self, buf: String) -> anyhow::Result<()> {
+ let mut args = buf.split(" ");
+ let _cmd_name = args.next().ok_or_else(|| Error::MissingArgument)?;
+
+ let session_no = args
+ .next()
+ .ok_or_else(|| Error::MissingArgument)
+ .and_then(|arg| {
+ arg.parse::<i64>().map_err(|err| Error::ParseError {
+ pos: "session_no",
+ source: err,
+ })
+ })?;
+
+ let room_name = args.next().ok_or_else(|| Error::MissingArgument)?;
+
+ let conn = self
+ .conn_map
+ .socket_tx
+ .get(&session_no)
+ .ok_or_else(|| Error::MissingConnectionId)?;
+ smol::block_on(conn.send(ConnMsg::JoinRoom {
+ room: room_name.to_string(),
+ }))?;
+
+ Ok(())
+ }
+
+ fn cmd_part(&mut self, buf: String) -> anyhow::Result<()> {
+ let mut args = buf.split(" ");
+ let _cmd_name = args.next().ok_or_else(|| Error::MissingArgument)?;
+
+ let session_no = args
+ .next()
+ .ok_or_else(|| Error::MissingArgument)
+ .and_then(|arg| {
+ arg.parse::<i64>().map_err(|err| Error::ParseError {
+ pos: "session_no",
+ source: err,
+ })
+ })?;
+
+ let room_name = args.next().ok_or_else(|| Error::MissingArgument)?;
+
+ let conn = self
+ .conn_map
+ .socket_tx
+ .get(&session_no)
+ .ok_or_else(|| Error::MissingConnectionId)?;
+ smol::block_on(conn.send(ConnMsg::PartRoom {
+ room: room_name.to_string(),
+ }))?;
+
+ Ok(())
+ }
+
+ fn cmd_nick(&mut self, buf: String) -> anyhow::Result<()> {
+ let mut args = buf.split(" ");
+ let _cmd_name = args.next().ok_or_else(|| Error::MissingArgument)?;
+
+ let session_no = args
+ .next()
+ .ok_or_else(|| Error::MissingArgument)
+ .and_then(|arg| {
+ arg.parse::<i64>().map_err(|err| Error::ParseError {
+ pos: "session_no",
+ source: err,
+ })
+ })?;
+
+ let nick = args.next().ok_or_else(|| Error::MissingArgument)?;
+ let conn = self
+ .conn_map
+ .socket_tx
+ .get(&session_no)
+ .ok_or_else(|| Error::MissingConnectionId)?;
+
+ smol::block_on(conn.send(ConnMsg::ChangeNick {
+ name: nick.to_string(),
+ }))?;
+ Ok(())
+ }
+
+ fn cmd_message(&mut self, buf: String) -> anyhow::Result<()> {
+ match self.current_buffer {
+ Some(_) => { /* TODO: send message to connection */ }
+ None => {
+ for (_id, conn) in &mut self.conn_map.socket_tx {
+ smol::block_on(async {
+ let _ = conn
+ .send(ConnMsg::BufLine {
+ id: "example".to_string(),
+ msg: buf.clone(),
+ })
+ .await;
+ });
+ }
+ //self.lines.push(buf)
+ }
+ }
+
+ Ok(())
+ }
}
M smolboi/src/bin/server.rs => smolboi/src/bin/server.rs +86 -85
@@ 1,5 1,6 @@
-#[macro_use] extern crate log;
-use async_channel::{unbounded, Sender, Receiver};
+#[macro_use]
+extern crate log;
+use async_channel::{unbounded, Receiver, Sender};
use async_dup::Arc;
use futures_util::{future, select, StreamExt};
use smol::{Async, Executor};
@@ 13,30 14,31 @@ use smolboi::server::Server;
use smolboi::util;
fn main() -> anyhow::Result<()> {
- env_logger::init();
- info!("logging subsystem initialized");
-
- // bootstrap the runtime
- let ex = Arc::new(Executor::new());
- let rt_ex = ex.clone();
- let rt = thread::spawn(move || { smol::block_on(rt_ex.clone().run(future::pending::<()>())) });
- info!("started single-threaded executor");
-
- // setup the acceptor
- let address = "127.0.0.1:7777".parse::<SocketAddr>()?;
- let listener = Async::<TcpListener>::bind(address)?;
- info!("set up listener on 127.0.0.1:7777");
-
- let (events_tx, events_rx) = unbounded();
-
- ex.spawn(acceptor_task(listener, events_tx.clone(), ex.clone())).detach();
- ex.spawn(broker_task(events_rx)).detach();
-
- if let Err(msg) = rt.join() {
- warn!("executor exited w/ error: {:?}", msg);
- }
-
- Ok(())
+ env_logger::init();
+ info!("logging subsystem initialized");
+
+ // bootstrap the runtime
+ let ex = Arc::new(Executor::new());
+ let rt_ex = ex.clone();
+ let rt = thread::spawn(move || smol::block_on(rt_ex.clone().run(future::pending::<()>())));
+ info!("started single-threaded executor");
+
+ // setup the acceptor
+ let address = "127.0.0.1:7777".parse::<SocketAddr>()?;
+ let listener = Async::<TcpListener>::bind(address)?;
+ info!("set up listener on 127.0.0.1:7777");
+
+ let (events_tx, events_rx) = unbounded();
+
+ ex.spawn(acceptor_task(listener, events_tx.clone(), ex.clone()))
+ .detach();
+ ex.spawn(broker_task(events_rx)).detach();
+
+ if let Err(msg) = rt.join() {
+ warn!("executor exited w/ error: {:?}", msg);
+ }
+
+ Ok(())
}
/*
@@ 47,89 49,88 @@ fn main() -> anyhow::Result<()> {
* which can be used to send events to the server. Once a peer has established
* a registration w/ the server, through `Event::NewPeer { .. }`, the server
* can similarly issue replies to that peer's mailbox arbitrarily.
- *
+ *
*/
async fn broker_task(events: Receiver<Event>) {
- let mut server = Server::new();
- let mut events = events.fuse();
-
- loop {
- select! {
- event = events.next() => match event {
- Some(event) => {
- info!("processing server event: {:?}", event);
- if let Err(msg) = server.process_event(event).await {
- warn!("unexpected server error: {:?}", msg);
- }
- },
-
- None => { panic!("event stream is gone?") },
- },
- }
- }
+ let mut server = Server::new();
+ let mut events = events.fuse();
+
+ loop {
+ select! {
+ event = events.next() => match event {
+ Some(event) => {
+ info!("processing server event: {:?}", event);
+ if let Err(msg) = server.process_event(event).await {
+ warn!("unexpected server error: {:?}", msg);
+ }
+ },
+
+ None => { panic!("event stream is gone?") },
+ },
+ }
+ }
}
-
/*
* The acceptor task is a tight loop over incoming connections.
* When an incoming connection is received we call `accept_client(..)` which
* sets up some channels and spawns a detached connection-specific task.
- *
+ *
* It is imperative that `accept_client(..)` returns quickly & does not block,
* as any clients connecting concurrently will be blocked until that call
* returns control to this task.
*/
-async fn acceptor_task(
- listener: Async<TcpListener>,
- events: Sender<Event>,
- ex: Arc<Executor<'_>>,
-) {
- info!("starting acceptor loop");
-
- loop {
- let (client_stream, client_addr) = match listener.accept().await {
- Ok(client) => client,
- Err(msg) => { warn!("error accepting client: {:?}", msg); continue },
- };
-
- let events = events.clone(); // clone events outbox so we can move into the client's task
- let client_ex = ex.clone();
- ex.spawn(async move {
- if let Err(msg) = accept_client(client_stream, client_addr, events, client_ex).await {
- warn!("warning: failed to accept client. {:?}", msg);
- }
- }).detach();
- }
+async fn acceptor_task(listener: Async<TcpListener>, events: Sender<Event>, ex: Arc<Executor<'_>>) {
+ info!("starting acceptor loop");
+
+ loop {
+ let (client_stream, client_addr) = match listener.accept().await {
+ Ok(client) => client,
+ Err(msg) => {
+ warn!("error accepting client: {:?}", msg);
+ continue;
+ }
+ };
+
+ let events = events.clone(); // clone events outbox so we can move into the client's task
+ let client_ex = ex.clone();
+ ex.spawn(async move {
+ if let Err(msg) = accept_client(client_stream, client_addr, events, client_ex).await {
+ warn!("warning: failed to accept client. {:?}", msg);
+ }
+ })
+ .detach();
+ }
}
/**
* Sets up mailboxes for a client and begins reading packets from the client's
* incoming TCP socket. Control is returned to the caller once the mailboxes
* are established and the task has been sent to the async runtime.
- *
+ *
* Two tasks are spawned:
- *
+ *
* 1. The packet decoding task reads the incoming stream and translates
* it into `Packet { .. }` objects.
- *
+ *
* 2. The client event loop which selects over events from the server as well
* as incoming packets from the wire. These are dispatched to the `Client { .. }`
* which is esentially a state machine representing the connected client.
*/
async fn accept_client(
- stream: Async<TcpStream>,
- addr: SocketAddr,
- events: Sender<Event>,
- ex: Arc<Executor<'_>>,
+ stream: Async<TcpStream>,
+ addr: SocketAddr,
+ events: Sender<Event>,
+ ex: Arc<Executor<'_>>,
) -> anyhow::Result<()> {
- info!("accepting client w/ ip: {}", addr);
- let stream = Arc::new(stream);
- let (packets_tx, packets_rx) = unbounded();
- let (tcp_quit_tx, tcp_quit_rx) = unbounded::<void::Void>();
-
- // peer event loop
- let peer_ex = ex.clone();
- ex.spawn(async move {
+ info!("accepting client w/ ip: {}", addr);
+ let stream = Arc::new(stream);
+ let (packets_tx, packets_rx) = unbounded();
+ let (tcp_quit_tx, tcp_quit_rx) = unbounded::<void::Void>();
+
+ // peer event loop
+ let peer_ex = ex.clone();
+ ex.spawn(async move {
// incoming server connections
let (mut peer, peer_rx) = Peer::with_stream(&events, &stream);
let mut peer_events = peer_rx.fuse(); // stream of events from server
@@ 140,7 141,7 @@ async fn accept_client(
peer_ex.spawn(async move {
loop {
match util::read_packet(&mut stream.clone()).await {
- Ok(packet) => {
+ Ok(packet) => {
if let Err(_) = packets_tx.send(packet).await {
panic!("BUG: sent packet but connection event loop is gone?");
}
@@ 149,7 150,7 @@ async fn accept_client(
Err(msg) => {
warn!("malformed packet from client: {:?}", msg);
break; // TODO: kill the client?
- },
+ },
}
}
@@ 189,5 190,5 @@ async fn accept_client(
}
}).detach();
- Ok(()) // created peer event loop a-ok
+ Ok(()) // created peer event loop a-ok
}
M smolboi/src/lib.rs => smolboi/src/lib.rs +5 -3
@@ 1,6 1,8 @@
-#[macro_use] extern crate log;
-
pub mod peer;
pub mod protocol;
pub mod server;
-pub mod util;>
\ No newline at end of file
+pub mod util;
+
+mod prelude {
+ pub use log::{debug, error, info, warn};
+}
M smolboi/src/peer.rs => smolboi/src/peer.rs +189 -171
@@ 1,6 1,7 @@
-use crate::protocol::{Event, Target};
use crate::protocol::packet::Packet;
-use crate::util;
+use crate::protocol::{Event, Target};
+use crate::{prelude::*, util};
+
use std::net::TcpStream;
use async_channel::{unbounded, Receiver, Sender};
@@ 10,183 11,200 @@ use thiserror::Error;
#[derive(Debug, Error)]
enum PeerError {
- #[error("This event cannot be processed by a peer.")]
- UnsupportedEvent,
+ #[error("This event cannot be processed by a peer.")]
+ UnsupportedEvent,
- #[error("This packet type cannot be sent by a peer.")]
- UnsupportedPacket,
+ #[error("This packet type cannot be sent by a peer.")]
+ UnsupportedPacket,
- #[error("This packet cannot be sent by a peer at this time.")]
- InvalidSequence,
+ #[error("This packet cannot be sent by a peer at this time.")]
+ InvalidSequence,
}
-
enum PeerState {
- Initialized,
- Registered,
+ Initialized,
+ Registered,
}
pub struct Peer {
- broker: Sender<Event>,
- event_inbox: Sender<Event>,
- name: Option<String>,
- state: PeerState,
- stream: Arc<Async<TcpStream>>,
+ broker: Sender<Event>,
+ event_inbox: Sender<Event>,
+ name: Option<String>,
+ state: PeerState,
+ stream: Arc<Async<TcpStream>>,
}
impl Peer {
- pub fn with_stream(broker: &Sender<Event>, stream: &Arc<Async<TcpStream>>) -> (Self, Receiver<Event>) {
- let (events_tx, events_rx) = unbounded();
-
- let peer = Self {
- broker: broker.clone(),
- event_inbox: events_tx, // hold reference to this so we dont' die prematurely?
- name: None,
- state: PeerState::Initialized,
- stream: stream.clone(),
- };
-
- (peer, events_rx)
- }
-
- pub async fn process_packet(&mut self, packet: Packet) -> anyhow::Result<()> {
- match self.state {
- PeerState::Initialized => Ok(self.process_packet_initialized(packet).await?),
- PeerState::Registered => Ok(self.process_packet_registered(packet).await?),
- }
- }
-
- async fn process_packet_initialized(&mut self, packet: Packet) -> anyhow::Result<()> {
- match packet {
- Packet::Register { name } => {
- Ok(self.broker.send(Event::NewPeer { user: name, events: self.event_inbox.clone() }).await?)
- },
-
- Packet::Join { .. }
- | Packet::Part { .. }
- | Packet::MessageRoom { .. } => { Err(PeerError::InvalidSequence.into()) },
-
-
- Packet::Disconnect
- | Packet::Notice { .. }
- | Packet::RegistrationAccepted { .. }
- | Packet::RegistrationRejected => Err(PeerError::UnsupportedPacket.into()),
- }
- }
-
- async fn process_packet_registered(&mut self, packet: Packet) -> anyhow::Result<()> {
- match packet {
- Packet::Register { .. } => { Err(PeerError::InvalidSequence.into()) },
-
- Packet::Join { room } => {
- let join_event = Event::Join {
- user: self.name.clone().unwrap(),
- room: room,
- };
-
- Ok(self.broker.send(join_event).await?)
- },
-
- Packet::Part { room } => {
- let part_event = Event::Part {
- user: self.name.clone().unwrap(),
- room: room,
- };
-
- Ok(self.broker.send(part_event).await?)
- },
-
- Packet::MessageRoom { room, from, body } => {
- let message_event = Event::Message {
- to: Target::Room(room),
- from: Target::User(from),
- body: body,
- };
-
- Ok(self.broker.send(message_event).await?)
- },
-
- Packet::Disconnect
- | Packet::Notice { .. }
- | Packet::RegistrationAccepted { .. }
- | Packet::RegistrationRejected => Err(PeerError::UnsupportedPacket.into()),
- }
- }
-
- pub async fn process_event(&mut self, event: Event) -> anyhow::Result<()> {
- match self.state {
- PeerState::Initialized => Ok(self.process_event_initialized(event).await?),
- PeerState::Registered => Ok(self.process_event_registered(event).await?),
- }
- }
-
- async fn process_event_initialized(&mut self, event: Event) -> anyhow::Result<()> {
- match event {
- Event::AcceptedRegistration { name } => {
- info!("sending accepted registration ...");
- self.state = PeerState::Registered;
- self.name = Some(name.clone());
- let packet = Packet::RegistrationAccepted { name: name };
- Ok(util::write_packet(&mut self.stream, packet).await?)
- },
-
- Event::RejectedRegistration => {
- Ok(util::write_packet(&mut self.stream, Packet::RegistrationRejected).await?)
- },
-
- Event::Message { .. }
- | Event::Join { .. }
- | Event::Part { .. } => { Err(PeerError::InvalidSequence.into()) },
-
- Event::Disconnection { .. }
- | Event::NewPeer { .. }
- | Event::Registration { .. } => { Err(PeerError::UnsupportedEvent.into()) },
- }
- }
-
- async fn process_event_registered(&mut self, event: Event) -> anyhow::Result<()> {
- match event {
- // room message
- Event::Message { from: Target::User(from), to: Target::Room(to), body } => {
- let message_packet = Packet::MessageRoom {
- room: to,
- from: from,
- body: body,
- };
-
- Ok(util::write_packet(&mut self.stream, message_packet).await?)
- },
-
- // system message
- Event::Message { from: Target::Connection, to: Target::Connection, body } => {
- info!("sending packet notice");
- Ok(util::write_packet(&mut self.stream, Packet::Notice { body }).await?)
- },
-
- Event::Message { .. } => {
- warn!("unhandled message type: {:?}", event);
- Ok(())
- },
-
- Event::AcceptedRegistration { .. }
- | Event::RejectedRegistration => { Err(PeerError::InvalidSequence.into()) },
-
- Event::Disconnection { .. }
- | Event::NewPeer { .. }
- | Event::Join { .. }
- | Event::Part { .. }
- | Event::Registration { .. } => { Err(PeerError::UnsupportedEvent.into()) },
- }
- }
-
-
-
- pub async fn disconnect(&mut self) -> anyhow::Result<()> {
- if self.name.is_none() {
- warn!("disconnection received before peer was registered?");
- return Ok(());
- }
-
- Ok(self.broker.send(Event::Disconnection { user: self.name.clone().unwrap() }).await?)
- }
-}>
\ No newline at end of file
+ pub fn with_stream(
+ broker: &Sender<Event>,
+ stream: &Arc<Async<TcpStream>>,
+ ) -> (Self, Receiver<Event>) {
+ let (events_tx, events_rx) = unbounded();
+
+ let peer = Self {
+ broker: broker.clone(),
+ event_inbox: events_tx, // hold reference to this so we dont' die prematurely?
+ name: None,
+ state: PeerState::Initialized,
+ stream: stream.clone(),
+ };
+
+ (peer, events_rx)
+ }
+
+ pub async fn process_packet(&mut self, packet: Packet) -> anyhow::Result<()> {
+ match self.state {
+ PeerState::Initialized => Ok(self.process_packet_initialized(packet).await?),
+ PeerState::Registered => Ok(self.process_packet_registered(packet).await?),
+ }
+ }
+
+ async fn process_packet_initialized(&mut self, packet: Packet) -> anyhow::Result<()> {
+ match packet {
+ Packet::Register { name } => Ok(self
+ .broker
+ .send(Event::NewPeer {
+ user: name,
+ events: self.event_inbox.clone(),
+ })
+ .await?),
+
+ Packet::Join { .. } | Packet::Part { .. } | Packet::MessageRoom { .. } => {
+ Err(PeerError::InvalidSequence.into())
+ }
+
+ Packet::Disconnect
+ | Packet::Notice { .. }
+ | Packet::RegistrationAccepted { .. }
+ | Packet::RegistrationRejected => Err(PeerError::UnsupportedPacket.into()),
+ }
+ }
+
+ async fn process_packet_registered(&mut self, packet: Packet) -> anyhow::Result<()> {
+ match packet {
+ Packet::Register { .. } => Err(PeerError::InvalidSequence.into()),
+
+ Packet::Join { room } => {
+ let join_event = Event::Join {
+ user: self.name.clone().unwrap(),
+ room: room,
+ };
+
+ Ok(self.broker.send(join_event).await?)
+ }
+
+ Packet::Part { room } => {
+ let part_event = Event::Part {
+ user: self.name.clone().unwrap(),
+ room: room,
+ };
+
+ Ok(self.broker.send(part_event).await?)
+ }
+
+ Packet::MessageRoom { room, from, body } => {
+ let message_event = Event::Message {
+ to: Target::Room(room),
+ from: Target::User(from),
+ body: body,
+ };
+
+ Ok(self.broker.send(message_event).await?)
+ }
+
+ Packet::Disconnect
+ | Packet::Notice { .. }
+ | Packet::RegistrationAccepted { .. }
+ | Packet::RegistrationRejected => Err(PeerError::UnsupportedPacket.into()),
+ }
+ }
+
+ pub async fn process_event(&mut self, event: Event) -> anyhow::Result<()> {
+ match self.state {
+ PeerState::Initialized => Ok(self.process_event_initialized(event).await?),
+ PeerState::Registered => Ok(self.process_event_registered(event).await?),
+ }
+ }
+
+ async fn process_event_initialized(&mut self, event: Event) -> anyhow::Result<()> {
+ match event {
+ Event::AcceptedRegistration { name } => {
+ info!("sending accepted registration ...");
+ self.state = PeerState::Registered;
+ self.name = Some(name.clone());
+ let packet = Packet::RegistrationAccepted { name: name };
+ Ok(util::write_packet(&mut self.stream, packet).await?)
+ }
+
+ Event::RejectedRegistration => {
+ Ok(util::write_packet(&mut self.stream, Packet::RegistrationRejected).await?)
+ }
+
+ Event::Message { .. } | Event::Join { .. } | Event::Part { .. } => {
+ Err(PeerError::InvalidSequence.into())
+ }
+
+ Event::Disconnection { .. } | Event::NewPeer { .. } | Event::Registration { .. } => {
+ Err(PeerError::UnsupportedEvent.into())
+ }
+ }
+ }
+
+ async fn process_event_registered(&mut self, event: Event) -> anyhow::Result<()> {
+ match event {
+ // room message
+ Event::Message {
+ from: Target::User(from),
+ to: Target::Room(to),
+ body,
+ } => {
+ let message_packet = Packet::MessageRoom {
+ room: to,
+ from: from,
+ body: body,
+ };
+
+ Ok(util::write_packet(&mut self.stream, message_packet).await?)
+ }
+
+ // system message
+ Event::Message {
+ from: Target::Connection,
+ to: Target::Connection,
+ body,
+ } => {
+ info!("sending packet notice");
+ Ok(util::write_packet(&mut self.stream, Packet::Notice { body }).await?)
+ }
+
+ Event::Message { .. } => {
+ warn!("unhandled message type: {:?}", event);
+ Ok(())
+ }
+
+ Event::AcceptedRegistration { .. } | Event::RejectedRegistration => {
+ Err(PeerError::InvalidSequence.into())
+ }
+
+ Event::Disconnection { .. }
+ | Event::NewPeer { .. }
+ | Event::Join { .. }
+ | Event::Part { .. }
+ | Event::Registration { .. } => Err(PeerError::UnsupportedEvent.into()),
+ }
+ }
+
+ pub async fn disconnect(&mut self) -> anyhow::Result<()> {
+ if self.name.is_none() {
+ warn!("disconnection received before peer was registered?");
+ return Ok(());
+ }
+
+ Ok(self
+ .broker
+ .send(Event::Disconnection {
+ user: self.name.clone().unwrap(),
+ })
+ .await?)
+ }
+}
M smolboi/src/protocol.rs => smolboi/src/protocol.rs +373 -334
@@ 6,349 6,388 @@ pub type Result<T> = std::result::Result<T, ProtocolError>;
#[derive(Debug, Error)]
pub enum ProtocolError {
- /// Command does not exist, or is otherwise malformed.
- #[error("incorrect syntax for command")]
- InvalidCommand,
-
- /// Text was expected but it did not decode cleanly into a UTF-8 `String`.
- #[error("expected string - did not decode to valid utf-8.")]
- InvalidEncoding,
-
- /// Command was decoded successfully, but is not allowed during the current connection state.
- #[error("command was understood but is not legal at this time.")]
- InvalidSequence,
-
- /// Command was decoded successfully but a required field is missing.
- #[error("expected field {} but stream ended prematurely.", field)]
- MissingField { field: &'static str },
+ /// Command does not exist, or is otherwise malformed.
+ #[error("incorrect syntax for command")]
+ InvalidCommand,
+
+ /// Text was expected but it did not decode cleanly into a UTF-8 `String`.
+ #[error("expected string - did not decode to valid utf-8.")]
+ InvalidEncoding,
+
+ /// Command was decoded successfully, but is not allowed during the current connection state.
+ #[error("command was understood but is not legal at this time.")]
+ InvalidSequence,
+
+ /// Command was decoded successfully but a required field is missing.
+ #[error("expected field {} but stream ended prematurely.", field)]
+ MissingField { field: &'static str },
}
#[derive(Clone, Debug)]
pub enum Event {
- /// Attempts to register peer's username & event-stream w/ the server
- NewPeer { user: String, events: Sender<Event> },
-
- /// Notify server that the (registered) peer is disconnected.
- Disconnection { user: String },
-
- /// Request to subscribe a user to a room's message stream.
- Join { user: String, room: String },
-
- /// Request to remove a user from the room's subscribers.
- Part { user: String, room: String },
-
- /// Publish a message as a user to a room's message stream.
- Message { from: Target, to: Target, body: String },
-
- /// Client is requesting to select the indicated username.
- Registration { name: String },
-
- /// Server accepted the registration of this peer.
- AcceptedRegistration { name: String },
-
- /// Server rejecected registration of this peer.
- RejectedRegistration,
+ /// Attempts to register peer's username & event-stream w/ the server
+ NewPeer { user: String, events: Sender<Event> },
+
+ /// Notify server that the (registered) peer is disconnected.
+ Disconnection { user: String },
+
+ /// Request to subscribe a user to a room's message stream.
+ Join { user: String, room: String },
+
+ /// Request to remove a user from the room's subscribers.
+ Part { user: String, room: String },
+
+ /// Publish a message as a user to a room's message stream.
+ Message {
+ from: Target,
+ to: Target,
+ body: String,
+ },
+
+ /// Client is requesting to select the indicated username.
+ Registration { name: String },
+
+ /// Server accepted the registration of this peer.
+ AcceptedRegistration { name: String },
+
+ /// Server rejecected registration of this peer.
+ RejectedRegistration,
}
#[derive(Clone, Debug, Eq, PartialEq)]
pub enum Target {
- Room(String),
- User(String),
- Connection,
+ Room(String),
+ User(String),
+ Connection,
}
pub mod packet {
- use byteorder::{ReadBytesExt, WriteBytesExt, NetworkEndian};
- use thiserror::Error;
- use std::io::{self, Cursor, Read, Write};
-
- #[derive(Debug, Error)]
- pub enum Error {
- #[error(transparent)]
- Io {
- #[from]
- source: io::Error,
- },
-
- #[error(transparent)]
- TextEncoding {
- #[from]
- source: std::string::FromUtf8Error,
- },
-
- #[error("unknown packet type: {}", packet_ty)]
- UnknownPacket { packet_ty: u8 },
- }
- #[derive(Clone, Debug, Eq, PartialEq)]
- pub enum Packet {
- Register { name: String },
-
- RegistrationAccepted { name: String },
-
- RegistrationRejected,
-
- Disconnect,
-
- Join { room: String },
-
- Part { room: String },
-
- MessageRoom { room: String, from: String, body: String },
-
- Notice { body: String },
- }
-
- impl Packet {
-
- pub fn encode(&self) -> Result<Vec<u8>, Error> {
- match self {
- Packet::Register { name } => Packet::encode_registration(&name),
- Packet::RegistrationAccepted { name } => Packet::encode_registration_accepted(&name),
- Packet::RegistrationRejected => Packet::encode_registration_rejected(),
- Packet::Disconnect => Packet::encode_disconect(),
-
- Packet::Join { room }=> Packet::encode_join_room(&room),
- Packet::Part { room }=> Packet::encode_part_room(&room),
- Packet::MessageRoom { room, from, body } => Packet::encode_message_room(&room, &from, &body),
- Packet::Notice { body } => Packet::encode_notice(&body),
- }
- }
-
- fn encode_registration(name: &str) -> Result<Vec<u8>, Error> {
- let mut buf = vec![];
- buf.write_u8(0x00)?;
-
- assert!(name.len() <= u16::MAX.into());
- buf.write_u16::<NetworkEndian>(name.len() as u16)?;
- buf.write(name.as_bytes())?;
-
- Ok(buf)
- }
-
- fn encode_registration_accepted(name: &str) -> Result<Vec<u8>, Error> {
- let mut buf = vec![];
- buf.write_u8(0x01)?;
-
- assert!(name.len() <= u16::MAX.into());
- buf.write_u16::<NetworkEndian>(name.len() as u16)?;
- buf.write(name.as_bytes())?;
-
- Ok(buf)
- }
-
- fn encode_registration_rejected() -> Result<Vec<u8>, Error> {
- let mut buf = vec![];
- buf.write_u8(0x02)?;
- Ok(buf)
- }
-
- fn encode_disconect() -> Result<Vec<u8>, Error> {
- let mut buf = vec![];
- buf.write_u8(0x03)?;
- Ok(buf)
- }
-
- fn encode_join_room(room: &str) -> Result<Vec<u8>, Error> {
- let mut buf = vec![];
- buf.write_u8(0x10)?;
-
- assert!(room.len() <= u16::MAX.into());
- buf.write_u16::<NetworkEndian>(room.len() as u16)?;
- buf.write(room.as_bytes())?;
-
- Ok(buf)
- }
-
- fn encode_part_room(room: &str) -> Result<Vec<u8>, Error> {
- let mut buf = vec![];
- buf.write_u8(0x11)?;
-
- assert!(room.len() <= u16::MAX.into());
- buf.write_u16::<NetworkEndian>(room.len() as u16)?;
- buf.write(room.as_bytes())?;
-
- Ok(buf)
- }
-
- fn encode_message_room(room: &str, from: &str, body: &str) -> Result<Vec<u8>, Error> {
- let mut buf = vec![];
- buf.write_u8(0x20)?;
-
- assert!(room.len() <= u16::MAX.into());
- buf.write_u16::<NetworkEndian>(room.len() as u16)?;
- buf.write(room.as_bytes())?;
-
- assert!(from.len() <= u16::MAX.into());
- buf.write_u16::<NetworkEndian>(from.len() as u16)?;
- buf.write(from.as_bytes())?;
-
- assert!(body.len() <= u16::MAX.into());
- buf.write_u16::<NetworkEndian>(body.len() as u16)?;
- buf.write(body.as_bytes())?;
-
- Ok(buf)
- }
-
- fn encode_notice(body: &str) -> Result<Vec<u8>, Error> {
- let mut buf = vec![];
- buf.write_u8(0x21)?;
-
- assert!(body.len() <= u16::MAX.into());
- buf.write_u16::<NetworkEndian>(body.len() as u16)?;
- buf.write(body.as_bytes())?;
-
- Ok(buf)
- }
-
- /// Decodes a byte stream into a packet
- pub fn decode(byte_buf: &[u8]) -> Result<Packet, Error> {
- let mut cursor = Cursor::new(byte_buf);
- let packet_ty = cursor.read_u8()?;
-
- match packet_ty {
- 0x00 => Packet::decode_registration(&mut cursor),
- 0x01 => Packet::decode_accept_registration(&mut cursor),
- 0x02 => Packet::decode_reject_registration(&mut cursor),
- 0x03 => Packet::decode_disconnect(&mut cursor),
-
- 0x10 => Packet::decode_join_room(&mut cursor),
- 0x11 => Packet::decode_part_room(&mut cursor),
-
- 0x20 => Packet::decode_message_room(&mut cursor),
- 0x21 => Packet::decode_notice(&mut cursor),
- _ => return Err(Error::UnknownPacket { packet_ty: packet_ty }.into()),
- }
- }
-
- fn decode_registration(buf: &mut Cursor<&[u8]>) -> Result<Packet, Error> {
- let name_len = buf.read_u16::<NetworkEndian>()?;
- let mut name_buf = vec![0u8; name_len.into()];
- buf.read_exact(&mut name_buf)?;
- let name_str = String::from_utf8(name_buf)?;
- Ok(Packet::Register { name: name_str })
- }
-
- fn decode_accept_registration(buf: &mut Cursor<&[u8]>) -> Result<Packet, Error> {
- let name_len = buf.read_u16::<NetworkEndian>()?;
- let mut name_buf = vec![0u8; name_len.into()];
- buf.read_exact(&mut name_buf)?;
- let name_str = String::from_utf8(name_buf)?;
- Ok(Packet::RegistrationAccepted { name: name_str })
- }
-
- fn decode_reject_registration(_buf: &mut Cursor<&[u8]>) -> Result<Packet, Error> {
- // TODO: assert we are at end of buffer?
- Ok(Packet::RegistrationRejected)
- }
-
- fn decode_disconnect(_buf: &mut Cursor<&[u8]>) -> Result<Packet, Error> {
- // TODO: assert we are at end of buffer?
- Ok(Packet::Disconnect)
- }
-
- fn decode_join_room(buf: &mut Cursor<&[u8]>) -> Result<Packet, Error> {
- let len = buf.read_u16::<NetworkEndian>()?;
- let mut room_buf = vec![0u8; len.into()];
- buf.read_exact(&mut room_buf)?;
- let room_str = String::from_utf8(room_buf)?;
-
- Ok(Packet::Join { room: room_str })
- }
-
- fn decode_part_room(buf: &mut Cursor<&[u8]>) -> Result<Packet, Error> {
- let len = buf.read_u16::<NetworkEndian>()?;
- let mut room_buf = vec![0u8; len.into()];
- buf.read_exact(&mut room_buf)?;
- let room_str = String::from_utf8(room_buf)?;
-
- Ok(Packet::Part { room: room_str })
- }
-
- fn decode_message_room(buf: &mut Cursor<&[u8]>) -> Result<Packet, Error> {
- let len = buf.read_u16::<NetworkEndian>()?;
- let mut room_buf = vec![0u8; len.into()];
- buf.read_exact(&mut room_buf)?;
- let room_str = String::from_utf8(room_buf)?;
-
- let len = buf.read_u16::<NetworkEndian>()?;
- let mut from_buf = vec![0u8; len.into()];
- buf.read_exact(&mut from_buf)?;
- let from_str = String::from_utf8(from_buf)?;
-
- let len = buf.read_u16::<NetworkEndian>()?;
- let mut body_buf = vec![0u8; len.into()];
- buf.read_exact(&mut body_buf)?;
- let body_str = String::from_utf8(body_buf)?;
-
-
- Ok(Packet::MessageRoom { room: room_str, from: from_str, body: body_str })
- }
-
- fn decode_notice(buf: &mut Cursor<&[u8]>) -> Result<Packet, Error> {
- let len = buf.read_u16::<NetworkEndian>()?;
- let mut body_buf = vec![0u8; len.into()];
- buf.read_exact(&mut body_buf)?;
- let body_str = String::from_utf8(body_buf)?;
-
- Ok(Packet::Notice { body: body_str })
- }
- }
-
- #[cfg(test)]
- mod test {
- use super::Packet;
-
- fn test_roundtrip(pkt: Packet) {
- let wire = pkt.encode();
- assert!(wire.is_ok());
-
- let decoded = Packet::decode(&wire.unwrap());
- assert!(decoded.is_ok());
- assert_eq!(decoded.unwrap(), pkt);
- }
-
- #[test]
- fn roundtrip_register() {
- test_roundtrip(Packet::Register { name: "Chuck Testa".to_owned() });
- }
-
- #[test]
- fn roundtrip_register_accept() {
- test_roundtrip(Packet::RegistrationAccepted { name: "example-user".to_owned() });
- }
-
- #[test]
- fn roundtrip_register_reject() {
- test_roundtrip(Packet::RegistrationRejected);
- }
-
- #[test]
- fn roundtrip_disconnect() {
- test_roundtrip(Packet::Disconnect);
- }
-
- #[test]
- fn roudntrip_join() {
- test_roundtrip(Packet::Join { room: "example-room".to_owned() })
- }
-
- #[test]
- fn roundtrip_part() {
- test_roundtrip(Packet::Part { room: "example-room".to_owned() })
- }
-
- #[test]
- fn roundtrip_msg_room() {
- test_roundtrip(Packet::MessageRoom {
- room: "example-room".to_owned(),
- from: "example-user".to_owned(),
- body: "hello, world".to_owned()
- })
- }
-
- #[test]
- fn roundtrip_notice() {
- test_roundtrip(Packet::Notice { body: "important message from server".to_owned() })
- }
- }
+ use byteorder::{NetworkEndian, ReadBytesExt, WriteBytesExt};
+ use std::io::{self, Cursor, Read, Write};
+ use thiserror::Error;
+
+ #[derive(Debug, Error)]
+ pub enum Error {
+ #[error(transparent)]
+ Io {
+ #[from]
+ source: io::Error,
+ },
+
+ #[error(transparent)]
+ TextEncoding {
+ #[from]
+ source: std::string::FromUtf8Error,
+ },
+
+ #[error("unknown packet type: {}", packet_ty)]
+ UnknownPacket { packet_ty: u8 },
+ }
+ #[derive(Clone, Debug, Eq, PartialEq)]
+ pub enum Packet {
+ Register {
+ name: String,
+ },
+
+ RegistrationAccepted {
+ name: String,
+ },
+
+ RegistrationRejected,
+
+ Disconnect,
+
+ Join {
+ room: String,
+ },
+
+ Part {
+ room: String,
+ },
+
+ MessageRoom {
+ room: String,
+ from: String,
+ body: String,
+ },
+
+ Notice {
+ body: String,
+ },
+ }
+
+ impl Packet {
+ pub fn encode(&self) -> Result<Vec<u8>, Error> {
+ match self {
+ Packet::Register { name } => Packet::encode_registration(&name),
+ Packet::RegistrationAccepted { name } => {
+ Packet::encode_registration_accepted(&name)
+ }
+ Packet::RegistrationRejected => Packet::encode_registration_rejected(),
+ Packet::Disconnect => Packet::encode_disconect(),
+
+ Packet::Join { room } => Packet::encode_join_room(&room),
+ Packet::Part { room } => Packet::encode_part_room(&room),
+ Packet::MessageRoom { room, from, body } => {
+ Packet::encode_message_room(&room, &from, &body)
+ }
+ Packet::Notice { body } => Packet::encode_notice(&body),
+ }
+ }
+
+ fn encode_registration(name: &str) -> Result<Vec<u8>, Error> {
+ let mut buf = vec![];
+ buf.write_u8(0x00)?;
+
+ assert!(name.len() <= u16::MAX.into());
+ buf.write_u16::<NetworkEndian>(name.len() as u16)?;
+ buf.write(name.as_bytes())?;
+
+ Ok(buf)
+ }
+
+ fn encode_registration_accepted(name: &str) -> Result<Vec<u8>, Error> {
+ let mut buf = vec![];
+ buf.write_u8(0x01)?;
+
+ assert!(name.len() <= u16::MAX.into());
+ buf.write_u16::<NetworkEndian>(name.len() as u16)?;
+ buf.write(name.as_bytes())?;
+
+ Ok(buf)
+ }
+
+ fn encode_registration_rejected() -> Result<Vec<u8>, Error> {
+ let mut buf = vec![];
+ buf.write_u8(0x02)?;
+ Ok(buf)
+ }
+
+ fn encode_disconect() -> Result<Vec<u8>, Error> {
+ let mut buf = vec![];
+ buf.write_u8(0x03)?;
+ Ok(buf)
+ }
+
+ fn encode_join_room(room: &str) -> Result<Vec<u8>, Error> {
+ let mut buf = vec![];
+ buf.write_u8(0x10)?;
+
+ assert!(room.len() <= u16::MAX.into());
+ buf.write_u16::<NetworkEndian>(room.len() as u16)?;
+ buf.write(room.as_bytes())?;
+
+ Ok(buf)
+ }
+
+ fn encode_part_room(room: &str) -> Result<Vec<u8>, Error> {
+ let mut buf = vec![];
+ buf.write_u8(0x11)?;
+
+ assert!(room.len() <= u16::MAX.into());
+ buf.write_u16::<NetworkEndian>(room.len() as u16)?;
+ buf.write(room.as_bytes())?;
+
+ Ok(buf)
+ }
+
+ fn encode_message_room(room: &str, from: &str, body: &str) -> Result<Vec<u8>, Error> {
+ let mut buf = vec![];
+ buf.write_u8(0x20)?;
+
+ assert!(room.len() <= u16::MAX.into());
+ buf.write_u16::<NetworkEndian>(room.len() as u16)?;
+ buf.write(room.as_bytes())?;
+
+ assert!(from.len() <= u16::MAX.into());
+ buf.write_u16::<NetworkEndian>(from.len() as u16)?;
+ buf.write(from.as_bytes())?;
+
+ assert!(body.len() <= u16::MAX.into());
+ buf.write_u16::<NetworkEndian>(body.len() as u16)?;
+ buf.write(body.as_bytes())?;
+
+ Ok(buf)
+ }
+
+ fn encode_notice(body: &str) -> Result<Vec<u8>, Error> {
+ let mut buf = vec![];
+ buf.write_u8(0x21)?;
+
+ assert!(body.len() <= u16::MAX.into());
+ buf.write_u16::<NetworkEndian>(body.len() as u16)?;
+ buf.write(body.as_bytes())?;
+
+ Ok(buf)
+ }
+
+ /// Decodes a byte stream into a packet
+ pub fn decode(byte_buf: &[u8]) -> Result<Packet, Error> {
+ let mut cursor = Cursor::new(byte_buf);
+ let packet_ty = cursor.read_u8()?;
+
+ match packet_ty {
+ 0x00 => Packet::decode_registration(&mut cursor),
+ 0x01 => Packet::decode_accept_registration(&mut cursor),
+ 0x02 => Packet::decode_reject_registration(&mut cursor),
+ 0x03 => Packet::decode_disconnect(&mut cursor),
+
+ 0x10 => Packet::decode_join_room(&mut cursor),
+ 0x11 => Packet::decode_part_room(&mut cursor),
+
+ 0x20 => Packet::decode_message_room(&mut cursor),
+ 0x21 => Packet::decode_notice(&mut cursor),
+ _ => {
+ return Err(Error::UnknownPacket {
+ packet_ty: packet_ty,
+ }
+ .into())
+ }
+ }
+ }
+
+ fn decode_registration(buf: &mut Cursor<&[u8]>) -> Result<Packet, Error> {
+ let name_len = buf.read_u16::<NetworkEndian>()?;
+ let mut name_buf = vec![0u8; name_len.into()];
+ buf.read_exact(&mut name_buf)?;
+ let name_str = String::from_utf8(name_buf)?;
+ Ok(Packet::Register { name: name_str })
+ }
+
+ fn decode_accept_registration(buf: &mut Cursor<&[u8]>) -> Result<Packet, Error> {
+ let name_len = buf.read_u16::<NetworkEndian>()?;
+ let mut name_buf = vec![0u8; name_len.into()];
+ buf.read_exact(&mut name_buf)?;
+ let name_str = String::from_utf8(name_buf)?;
+ Ok(Packet::RegistrationAccepted { name: name_str })
+ }
+
+ fn decode_reject_registration(_buf: &mut Cursor<&[u8]>) -> Result<Packet, Error> {
+ // TODO: assert we are at end of buffer?
+ Ok(Packet::RegistrationRejected)
+ }
+
+ fn decode_disconnect(_buf: &mut Cursor<&[u8]>) -> Result<Packet, Error> {
+ // TODO: assert we are at end of buffer?
+ Ok(Packet::Disconnect)
+ }
+
+ fn decode_join_room(buf: &mut Cursor<&[u8]>) -> Result<Packet, Error> {
+ let len = buf.read_u16::<NetworkEndian>()?;
+ let mut room_buf = vec![0u8; len.into()];
+ buf.read_exact(&mut room_buf)?;
+ let room_str = String::from_utf8(room_buf)?;
+
+ Ok(Packet::Join { room: room_str })
+ }
+
+ fn decode_part_room(buf: &mut Cursor<&[u8]>) -> Result<Packet, Error> {
+ let len = buf.read_u16::<NetworkEndian>()?;
+ let mut room_buf = vec![0u8; len.into()];
+ buf.read_exact(&mut room_buf)?;
+ let room_str = String::from_utf8(room_buf)?;
+
+ Ok(Packet::Part { room: room_str })
+ }
+
+ fn decode_message_room(buf: &mut Cursor<&[u8]>) -> Result<Packet, Error> {
+ let len = buf.read_u16::<NetworkEndian>()?;
+ let mut room_buf = vec![0u8; len.into()];
+ buf.read_exact(&mut room_buf)?;
+ let room_str = String::from_utf8(room_buf)?;
+
+ let len = buf.read_u16::<NetworkEndian>()?;
+ let mut from_buf = vec![0u8; len.into()];
+ buf.read_exact(&mut from_buf)?;
+ let from_str = String::from_utf8(from_buf)?;
+
+ let len = buf.read_u16::<NetworkEndian>()?;
+ let mut body_buf = vec![0u8; len.into()];
+ buf.read_exact(&mut body_buf)?;
+ let body_str = String::from_utf8(body_buf)?;
+
+ Ok(Packet::MessageRoom {
+ room: room_str,
+ from: from_str,
+ body: body_str,
+ })
+ }
+
+ fn decode_notice(buf: &mut Cursor<&[u8]>) -> Result<Packet, Error> {
+ let len = buf.read_u16::<NetworkEndian>()?;
+ let mut body_buf = vec![0u8; len.into()];
+ buf.read_exact(&mut body_buf)?;
+ let body_str = String::from_utf8(body_buf)?;
+
+ Ok(Packet::Notice { body: body_str })
+ }
+ }
+
+ #[cfg(test)]
+ mod test {
+ use super::Packet;
+
+ fn test_roundtrip(pkt: Packet) {
+ let wire = pkt.encode();
+ assert!(wire.is_ok());
+
+ let decoded = Packet::decode(&wire.unwrap());
+ assert!(decoded.is_ok());
+ assert_eq!(decoded.unwrap(), pkt);
+ }
+
+ #[test]
+ fn roundtrip_register() {
+ test_roundtrip(Packet::Register {
+ name: "Chuck Testa".to_owned(),
+ });
+ }
+
+ #[test]
+ fn roundtrip_register_accept() {
+ test_roundtrip(Packet::RegistrationAccepted {
+ name: "example-user".to_owned(),
+ });
+ }
+
+ #[test]
+ fn roundtrip_register_reject() {
+ test_roundtrip(Packet::RegistrationRejected);
+ }
+
+ #[test]
+ fn roundtrip_disconnect() {
+ test_roundtrip(Packet::Disconnect);
+ }
+
+ #[test]
+ fn roudntrip_join() {
+ test_roundtrip(Packet::Join {
+ room: "example-room".to_owned(),
+ })
+ }
+
+ #[test]
+ fn roundtrip_part() {
+ test_roundtrip(Packet::Part {
+ room: "example-room".to_owned(),
+ })
+ }
+
+ #[test]
+ fn roundtrip_msg_room() {
+ test_roundtrip(Packet::MessageRoom {
+ room: "example-room".to_owned(),
+ from: "example-user".to_owned(),
+ body: "hello, world".to_owned(),
+ })
+ }
+
+ #[test]
+ fn roundtrip_notice() {
+ test_roundtrip(Packet::Notice {
+ body: "important message from server".to_owned(),
+ })
+ }
+ }
}
M smolboi/src/server.rs => smolboi/src/server.rs +140 -139
@@ 1,3 1,4 @@
+use crate::prelude::*;
use crate::protocol::{Event, Target};
use async_channel::Sender;
@@ 9,150 10,150 @@ type UserMap = HashMap<String, Sender<Event>>;
type RoomMap = HashMap<String, UserMap>;
pub struct Server {
- users: UserMap,
- rooms: RoomMap,
-
+ users: UserMap,
+ rooms: RoomMap,
}
#[derive(Debug, Error)]
enum ServerError {
- #[error("server could not find the peer specified")]
- UnknownPeer,
-
- #[error("server could not find the room specified")]
- UnknownRoom,
+ #[error("server could not find the peer specified")]
+ UnknownPeer,
+ #[error("server could not find the room specified")]
+ UnknownRoom,
}
impl Server {
- pub fn new() -> Self {
- Self {
- users: HashMap::new(),
- rooms: HashMap::new(),
- }
- }
-
- pub async fn process_event(&mut self, event: Event) -> anyhow::Result<()> {
- match event {
- Event::NewPeer { user, events } => {
- match self.users.entry(user) {
- Entry::Occupied(_) => {
- events.send(Event::RejectedRegistration).await?;
- Ok(())
- },
-
- Entry::Vacant(slot) => {
- let peer_name = slot.key().clone();
- slot.insert(events.clone());
- events.send(Event::AcceptedRegistration { name: peer_name }).await?;
- Ok(())
- }
- }
- },
-
- Event::Join { room, user } => {
- let peer = self.users.get(&user).ok_or(ServerError::UnknownPeer)?;
-
- match self.rooms.entry(room) {
- Entry::Vacant(slot) => {
- info!("creating room: {}", slot.key());
- let mut room_users = HashMap::new();
- room_users.insert(user, peer.clone());
- slot.insert(room_users);
- Ok(())
- },
-
- Entry::Occupied(mut slot) => {
- info!("joining existing room: {}", slot.key());
- slot.get_mut().insert(user, peer.clone());
- Ok(())
- },
- }
-
- },
-
- Event::Part { room, user } => {
- self.users.get(&user).ok_or(ServerError::UnknownPeer)?;
-
- match self.rooms.entry(room) {
- Entry::Vacant(slot) => {
- warn!("user tried to part non-existent room? {}", slot.key());
- Err(ServerError::UnknownPeer.into())
- },
-
- Entry::Occupied(mut slot) => {
- info!("user {} leaving {}", user, slot.key());
- if let None = slot.get_mut().remove(&user) {
- warn!("non-existent user {} parting {}", user, slot.key())
- }
-
- Ok(())
- }
- }
- },
-
-
- Event::Message { from: Target::User(ref from), to: Target::Room(ref to), .. } => {
- // just give up if we got a message from a peer we don't know about
- let peer = self.users.get(from).ok_or(ServerError::UnknownPeer)?;
-
- // check that the room exists & give client a message if it does not ...
- match self.rooms.get(to) {
- Some(room) => {
- // check that the user is in the room
- if !room.contains_key(from) {
- let event = Event::Message {
- from: Target::Connection,
- to: Target::Connection,
- body: format!("you are not a member of the room: {}", to),
- };
-
- // tell the peer they lack membership
- // TODO: should this be its own packet type instead of a global message?
- peer.send(event).await?;
- }
-
- // TODO: any way to parallelize this? i.e: rayon?
- for (_peer_name, peer_socket) in room.iter() {
- peer_socket.send(event.clone()).await?
- }
- },
-
- None => {
- let event = Event::Message {
- from: Target::Connection,
- to: Target::Connection,
- body: format!("you are not a member of the room: {}", to),
- };
-
- peer.send(event).await?;
- return Err(ServerError::UnknownRoom.into());
- },
- }
-
- Ok(())
- },
-
- Event::Disconnection { user } => {
- let peer = self.users.remove(&user);
- if let Some(_) = peer {
- debug!("removing {} from server directory", user);
- }
-
- for (room, room_users) in self.rooms.iter_mut() {
- // TODO: send disconnection notice to users in same channel?
- if let Some(_) = room_users.remove(&user) {
- debug!("removing {} from {}", user, room);
- }
- }
-
- Ok(())
- },
-
- event => {
- warn!("unhandled event: {:?}", event);
- Ok(())
- },
- }
- }
-}>
\ No newline at end of file
+ pub fn new() -> Self {
+ Self {
+ users: HashMap::new(),
+ rooms: HashMap::new(),
+ }
+ }
+
+ pub async fn process_event(&mut self, event: Event) -> anyhow::Result<()> {
+ match event {
+ Event::NewPeer { user, events } => match self.users.entry(user) {
+ Entry::Occupied(_) => {
+ events.send(Event::RejectedRegistration).await?;
+ Ok(())
+ }
+
+ Entry::Vacant(slot) => {
+ let peer_name = slot.key().clone();
+ slot.insert(events.clone());
+ events
+ .send(Event::AcceptedRegistration { name: peer_name })
+ .await?;
+ Ok(())
+ }
+ },
+
+ Event::Join { room, user } => {
+ let peer = self.users.get(&user).ok_or(ServerError::UnknownPeer)?;
+
+ match self.rooms.entry(room) {
+ Entry::Vacant(slot) => {
+ info!("creating room: {}", slot.key());
+ let mut room_users = HashMap::new();
+ room_users.insert(user, peer.clone());
+ slot.insert(room_users);
+ Ok(())
+ }
+
+ Entry::Occupied(mut slot) => {
+ info!("joining existing room: {}", slot.key());
+ slot.get_mut().insert(user, peer.clone());
+ Ok(())
+ }
+ }
+ }
+
+ Event::Part { room, user } => {
+ self.users.get(&user).ok_or(ServerError::UnknownPeer)?;
+
+ match self.rooms.entry(room) {
+ Entry::Vacant(slot) => {
+ warn!("user tried to part non-existent room? {}", slot.key());
+ Err(ServerError::UnknownPeer.into())
+ }
+
+ Entry::Occupied(mut slot) => {
+ info!("user {} leaving {}", user, slot.key());
+ if let None = slot.get_mut().remove(&user) {
+ warn!("non-existent user {} parting {}", user, slot.key())
+ }
+
+ Ok(())
+ }
+ }
+ }
+
+ Event::Message {
+ from: Target::User(ref from),
+ to: Target::Room(ref to),
+ ..
+ } => {
+ // just give up if we got a message from a peer we don't know about
+ let peer = self.users.get(from).ok_or(ServerError::UnknownPeer)?;
+
+ // check that the room exists & give client a message if it does not ...
+ match self.rooms.get(to) {
+ Some(room) => {
+ // check that the user is in the room
+ if !room.contains_key(from) {
+ let event = Event::Message {
+ from: Target::Connection,
+ to: Target::Connection,
+ body: format!("you are not a member of the room: {}", to),
+ };
+
+ // tell the peer they lack membership
+ // TODO: should this be its own packet type instead of a global message?
+ peer.send(event).await?;
+ }
+
+ // TODO: any way to parallelize this? i.e: rayon?
+ for (_peer_name, peer_socket) in room.iter() {
+ peer_socket.send(event.clone()).await?
+ }
+ }
+
+ None => {
+ let event = Event::Message {
+ from: Target::Connection,
+ to: Target::Connection,
+ body: format!("you are not a member of the room: {}", to),
+ };
+
+ peer.send(event).await?;
+ return Err(ServerError::UnknownRoom.into());
+ }
+ }
+
+ Ok(())
+ }
+
+ Event::Disconnection { user } => {
+ let peer = self.users.remove(&user);
+ if let Some(_) = peer {
+ debug!("removing {} from server directory", user);
+ }
+
+ for (room, room_users) in self.rooms.iter_mut() {
+ // TODO: send disconnection notice to users in same channel?
+ if let Some(_) = room_users.remove(&user) {
+ debug!("removing {} from {}", user, room);
+ }
+ }
+
+ Ok(())
+ }
+
+ event => {
+ warn!("unhandled event: {:?}", event);
+ Ok(())
+ }
+ }
+ }
+}
M smolboi/src/util.rs => smolboi/src/util.rs +38 -33
@@ 1,58 1,63 @@
-use byteorder::{ReadBytesExt, WriteBytesExt, NetworkEndian};
+use byteorder::{NetworkEndian, ReadBytesExt, WriteBytesExt};
use futures_util::{AsyncReadExt, AsyncWriteExt};
use crate::protocol::packet::Packet;
use std::io::{Read, Write};
-pub async fn write_packet<W: AsyncWriteExt + Unpin> (wtr: &mut W, packet: Packet) -> anyhow::Result<()> {
- let packet_buf = packet.encode()?;
- let mut packet_sz = [0u8; 2];
+pub async fn write_packet<W: AsyncWriteExt + Unpin>(
+ wtr: &mut W,
+ packet: Packet,
+) -> anyhow::Result<()> {
+ let packet_buf = packet.encode()?;
+ let mut packet_sz = [0u8; 2];
- assert!(packet_buf.len() < u16::MAX.into());
- (&mut packet_sz[..]).write_u16::<NetworkEndian>(packet_buf.len() as u16)?;
+ assert!(packet_buf.len() < u16::MAX.into());
+ (&mut packet_sz[..]).write_u16::<NetworkEndian>(packet_buf.len() as u16)?;
- wtr.write(&packet_sz).await?;
- wtr.write(&packet_buf).await?;
+ wtr.write(&packet_sz).await?;
+ wtr.write(&packet_buf).await?;
- Ok(())
+ Ok(())
}
-pub fn write_packet_sync<W: Write> (wtr: &mut W, packet: Packet) -> anyhow::Result<()> {
- let packet_buf = packet.encode()?;
- let mut packet_sz = [0u8; 2];
+pub fn write_packet_sync<W: Write>(wtr: &mut W, packet: Packet) -> anyhow::Result<()> {
+ let packet_buf = packet.encode()?;
+ let mut packet_sz = [0u8; 2];
- assert!(packet_buf.len() < u16::MAX.into());
- (&mut packet_sz[..]).write_u16::<NetworkEndian>(packet_buf.len() as u16)?;
+ assert!(packet_buf.len() < u16::MAX.into());
+ (&mut packet_sz[..]).write_u16::<NetworkEndian>(packet_buf.len() as u16)?;
- wtr.write(&packet_sz)?;
- wtr.write(&packet_buf)?;
+ wtr.write(&packet_sz)?;
+ wtr.write(&packet_buf)?;
- Ok(())
+ Ok(())
}
-pub async fn read_packet<R>(rdr: &mut R) -> anyhow::Result<Packet>
-where R: AsyncReadExt + Unpin {
- let mut packet_header_buf = [0u8; 2];
- rdr.read_exact(&mut packet_header_buf).await?;
+pub async fn read_packet<R>(rdr: &mut R) -> anyhow::Result<Packet>
+where
+ R: AsyncReadExt + Unpin,
+{
+ let mut packet_header_buf = [0u8; 2];
+ rdr.read_exact(&mut packet_header_buf).await?;
- let mut cursor = std::io::Cursor::new(packet_header_buf);
- let packet_sz = cursor.read_u16::<NetworkEndian>()?;
+ let mut cursor = std::io::Cursor::new(packet_header_buf);
+ let packet_sz = cursor.read_u16::<NetworkEndian>()?;
- let mut packet_body_buf = vec![0u8; packet_sz.into()];
- rdr.read_exact(&mut packet_body_buf).await?;
+ let mut packet_body_buf = vec![0u8; packet_sz.into()];
+ rdr.read_exact(&mut packet_body_buf).await?;
- Ok(Packet::decode(&packet_body_buf)?)
+ Ok(Packet::decode(&packet_body_buf)?)
}
pub fn read_packet_sync<R: Read>(rdr: &mut R) -> anyhow::Result<Packet> {
- let mut packet_header_buf = [0u8; 2];
- rdr.read_exact(&mut packet_header_buf)?;
+ let mut packet_header_buf = [0u8; 2];
+ rdr.read_exact(&mut packet_header_buf)?;
- let mut cursor = std::io::Cursor::new(packet_header_buf);
- let packet_sz = cursor.read_u16::<NetworkEndian>()?;
+ let mut cursor = std::io::Cursor::new(packet_header_buf);
+ let packet_sz = cursor.read_u16::<NetworkEndian>()?;
- let mut packet_body_buf = vec![0u8; packet_sz.into()];
- rdr.read_exact(&mut packet_body_buf)?;
+ let mut packet_body_buf = vec![0u8; packet_sz.into()];
+ rdr.read_exact(&mut packet_body_buf)?;
- Ok(Packet::decode(&packet_body_buf)?)
+ Ok(Packet::decode(&packet_body_buf)?)
}