~deciduously/re-redis

88127af006b916cc3134ecbd001368b67edbc5ae — Ben Lovy 3 years ago ea778ee master
Get/Set
5 files changed, 307 insertions(+), 22 deletions(-)

M Cargo.lock
M Cargo.toml
M src/bin/client.rs
M src/bin/server.rs
M src/lib.rs
M Cargo.lock => Cargo.lock +54 -0
@@ 10,6 10,21 @@ dependencies = [
]

[[package]]
name = "ansi_term"
version = "0.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ee49baf6cb617b853aa8d93bf420db2383fab46d314482ca2803b40d5fde979b"
dependencies = [
 "winapi",
]

[[package]]
name = "anyhow"
version = "1.0.34"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bf8dcb5b4bbaa28653b647d8c77bd4ed40183b48882e130c1f1ffb73de069fd7"

[[package]]
name = "async-channel"
version = "1.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"


@@ 223,6 238,16 @@ dependencies = [
]

[[package]]
name = "ctor"
version = "0.1.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7fbaabec2c953050352311293be5c6aba8e141ba19d6811862b232d6fd020484"
dependencies = [
 "quote",
 "syn",
]

[[package]]
name = "ctrlc"
version = "3.1.7"
source = "registry+https://github.com/rust-lang/crates.io-index"


@@ 233,6 258,12 @@ dependencies = [
]

[[package]]
name = "difference"
version = "2.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "524cbf6897b527295dff137cec09ecf3a05f4fddffd7dfcd1585403449e74198"

[[package]]
name = "env_logger"
version = "0.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"


@@ 513,6 544,15 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "13bd41f508810a131401606d54ac32a467c97172d74ba7662562ebba5ad07fa0"

[[package]]
name = "output_vt100"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "53cdc5b785b7a58c5aad8216b3dfa114df64b0b06ae6e1501cef91df2fbdf8f9"
dependencies = [
 "winapi",
]

[[package]]
name = "parking"
version = "2.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"


@@ 564,6 604,18 @@ dependencies = [
]

[[package]]
name = "pretty_assertions"
version = "0.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3f81e1644e1b54f5a68959a29aa86cde704219254669da328ecfdf6a1f09d427"
dependencies = [
 "ansi_term",
 "ctor",
 "difference",
 "output_vt100",
]

[[package]]
name = "pretty_env_logger"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"


@@ 613,11 665,13 @@ dependencies = [
name = "re-redis"
version = "0.1.0"
dependencies = [
 "anyhow",
 "async-std",
 "chrono",
 "ctrlc",
 "futures",
 "log",
 "pretty_assertions",
 "pretty_env_logger",
]


M Cargo.toml => Cargo.toml +5 -0
@@ 7,9 7,14 @@ license = "BSD-3-Clause"

[dependencies]

anyhow = "1"
async-std = "1"
chrono = "0.4"
ctrlc = "3"
futures = "0.3"
log = "0.4"
pretty_env_logger = "0.4"

[dev-dependencies]

pretty_assertions = "0.6"
\ No newline at end of file

M src/bin/client.rs => src/bin/client.rs +78 -2
@@ 1,3 1,79 @@
pub fn main() {
    println!("Client!")
use async_std::{
    io::{stdin, BufReader},
    net::{SocketAddr, TcpStream},
    prelude::*,
    task,
};
use chrono::Local;
use log::info;
use re_redis::{init_logging, now, time_log, Result, DEFAULT_PORT};

use futures::{select, FutureExt};

/// Client for interacting with server.
#[derive(Debug)]
struct Client {
    server_addr: SocketAddr,
}

impl Client {
    pub fn new(port: u16) -> Self {
        Self {
            server_addr: SocketAddr::from(([127, 0, 0, 1], port)),
        }
    }

    async fn try_run(&self) -> Result<()> {
        let stream = TcpStream::connect(self.server_addr).await?;
        time_log!("Client connected to {}.", &self.server_addr);
        let (reader, mut writer) = (&stream, &stream);
        let mut lines_from_server = BufReader::new(reader).lines().fuse();
        let mut lines_from_stdin = BufReader::new(stdin()).lines().fuse();
        loop {
            select! {
                line = lines_from_server.next().fuse() => match line {
                    Some(line) => {
                        let line = line?;
                        println!("{}", line);
                    },
                    None => {
                        println!("Server disconnected.");
                        break
                    },
                },
                line = lines_from_stdin.next().fuse() => match line {
                    Some(line) => {
                        let line = line?;
                        writer.write_all(line.as_bytes()).await?;
                        writer.write_all(b"\n").await?;
                    }
                    None => break,
                }
            }
        }
        Ok(())
    }
}

impl Default for Client {
    fn default() -> Self {
        Self::new(DEFAULT_PORT)
    }
}

pub fn main() -> Result<()> {
    init_logging(2);
    ctrlc::set_handler(|| {
        time_log!("Client process recieved Ctrl-C, shutting down.",);
        std::process::exit(0);
    })?;

    let client = Client::default();

    let fut = client.try_run();
    if let Err(e) = task::block_on(fut) {
        eprintln!("Error: {}", e);
    }

    Ok(())
}

M src/bin/server.rs => src/bin/server.rs +27 -19
@@ 7,7 7,7 @@ use async_std::{
use chrono::Local;
use futures::{channel::mpsc, select, sink::SinkExt, FutureExt};
use log::info;
use re_redis::{init_logging, now, time_log};
use re_redis::{init_logging, now, time_log, DataStore, Result, DEFAULT_PORT};
use std::{
    collections::hash_map::{Entry, HashMap},
    sync::Arc,


@@ 15,10 15,6 @@ use std::{

type Sender<T> = mpsc::UnboundedSender<T>;
type Receiver<T> = mpsc::UnboundedReceiver<T>;
type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;

/// By default, Redis runs on port 6379, we'll use 2000 for building this.
const DEFAULT_PORT: u16 = 2000;

/// The various events that can occur
#[derive(Debug)]


@@ 28,11 24,9 @@ enum Event {
        shutdown: Receiver<Void>,
    },
    Command {
        stream: Arc<TcpStream>,
        msg: String,
    },
    //Broadcast {
    //    msg: String,
    //},
}

/// This empty type enforces that no messages are sent down the shutdown channel.


@@ 54,7 48,7 @@ impl Server {
    }

    /// Run the server.
    pub async fn accept_loop(&self) -> Result<()> {
    pub async fn accept_loop(self) -> Result<()> {
        let listener = TcpListener::bind(self.addr).await?;

        // Set up broker's channel and task


@@ 68,7 62,7 @@ impl Server {
            let stream = stream?;

            // Spawn a new independent task for each client
            let _handle = task::spawn(Server::connection_loop(broker_send.clone(), stream));
            let _handle = task::spawn(Self::connection_loop(broker_send.clone(), stream));
        }
        drop(broker_send);
        broker_handle.await?;


@@ 94,7 88,13 @@ impl Server {
        while let Some(line) = lines.next().await {
            let line = line?;
            let msg = line.to_string();
            broker.send(Event::Command { msg }).await.unwrap()
            broker
                .send(Event::Command {
                    stream: Arc::clone(&stream),
                    msg,
                })
                .await
                .unwrap()
        }

        Ok(())


@@ 104,6 104,7 @@ impl Server {
    async fn broker_loop(events: Receiver<Event>) -> Result<()> {
        let (disconnect_send, mut disconnect_recv) = // 1
        mpsc::unbounded::<(usize, Receiver<String>)>();
        let mut data = DataStore::new(); // The Redis key/value store.
        let mut peers: HashMap<usize, Sender<String>> = HashMap::new();
        let mut id: usize = 0;
        let mut events = events.fuse();


@@ 122,19 123,22 @@ impl Server {
                },
            };
            match event {
                //Event::Broadcast { msg } => {
                //    for (_id, peer) in peers.iter_mut() {
                //        peer.send(msg.clone()).await?;
                //    }
                //}
                Event::Command { msg: _ } => {
                    // TODO - REDIS
                Event::Command { stream, msg } => {
                    time_log!("Command received from {}: {}", stream.peer_addr()?, &msg);

                    let response = match data.handle_command(&msg) {
                        Ok(resp) => resp,
                        Err(e) => e.to_string(),
                    };
                    time_log!("Response: {}", response);
                    (&*stream).write_all(response.as_bytes()).await?;
                    (&*stream).write_all(b"\n").await?;
                }
                Event::NewPeer { stream, shutdown } => {
                    match peers.entry(id) {
                        Entry::Occupied(..) => (),
                        Entry::Vacant(entry) => {
                            time_log!("New client {}: {}", id, stream.peer_addr()?);
                            time_log!("New connection: Client {} ({})", id, stream.peer_addr()?);

                            // Send a welcome message.
                            // TODO can/should this be a separate type of Event, sent from connection_loop?


@@ 163,6 167,10 @@ impl Server {
                }
            }
        }
        for (_id, peer) in peers.iter_mut() {
            peer.send("Server shutting down...sorry!\n".to_string())
                .await?;
        }
        drop(peers);
        drop(disconnect_send);
        Ok(())

M src/lib.rs => src/lib.rs +143 -1
@@ 1,5 1,15 @@
use anyhow::anyhow;
use log::{debug, trace, warn};
use std::env::{set_var, var};
use std::{
    collections::hash_map::{Entry, HashMap},
    env::{set_var, var},
    str::FromStr,
};

pub type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;

/// By default, Redis runs on port 6379, we'll use 2000 for building this.
pub const DEFAULT_PORT: u16 = 2000;

/// Start pretty_env_logger
pub fn init_logging(level: u8) {


@@ 52,3 62,135 @@ macro_rules! time_log {
        info!("[{}]: {}", now!(), format!($msg, $( $args ),*) );
    }
}

/// The various commands implemented.
#[derive(Debug, PartialEq)]
pub enum Command {
    Get(String),
    Set(String, String),
}

impl FromStr for Command {
    type Err = anyhow::Error;

    fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
        let mut parts = s.split_whitespace();
        match parts.next() {
            Some(cmd) => {
                let args = parts.collect::<Vec<&str>>(); // The rest gets collected
                match cmd {
                    "GET" => {
                        if args.len() != 1 {
                            Err(anyhow!(
                                "(error) ERR wrong number of arguments for GET command"
                            ))
                        } else {
                            Ok(Command::Get(args[0].to_string()))
                        }
                    }
                    "SET" => {
                        if args.len() != 2 {
                            Err(anyhow!(
                                "(error) ERR wrong number of arguments for SET command"
                            ))
                        } else {
                            Ok(Command::Set(args[0].to_string(), args[1].to_string()))
                        }
                    }
                    _ => Err(anyhow!("(error) ERR unknown command")),
                }
            }
            None => Err(anyhow!("(error) ERR no command supplied")),
        }
    }
}

/// The key/value store we're all here for.
#[derive(Debug, Default)]
pub struct DataStore {
    data: HashMap<String, String>,
}

impl DataStore {
    pub fn new() -> Self {
        Self::default()
    }

    pub fn handle_command(&mut self, cmd_str: &str) -> Result<String> {
        match Command::from_str(cmd_str) {
            Ok(cmd) => match cmd {
                Command::Get(key) => match self.data.entry(key) {
                    Entry::Occupied(entry) => Ok(entry.get().clone()),
                    Entry::Vacant(_entry) => Ok("(nil)".to_string()),
                },
                Command::Set(key, value) => {
                    self.data.entry(key).or_insert(value);
                    Ok("OK".to_string())
                }
            },
            Err(e) => Ok(format!("{:?}", e)),
        }
    }
}

#[cfg(test)]
mod test {
    use super::*;
    use pretty_assertions::assert_eq;

    #[test]
    fn test_command_from_str() {
        assert_eq!(
            Command::from_str("GET cow").unwrap(),
            Command::Get("cow".to_string())
        );
        assert_eq!(
            Command::from_str("GET").err().unwrap().to_string().as_str(),
            "(error) ERR wrong number of arguments for GET command"
        );
        assert_eq!(
            Command::from_str("GET cow bessie")
                .err()
                .unwrap()
                .to_string()
                .as_str(),
            "(error) ERR wrong number of arguments for GET command"
        );
        assert_eq!(
            Command::from_str("SET cow")
                .err()
                .unwrap()
                .to_string()
                .as_str(),
            "(error) ERR wrong number of arguments for SET command"
        );
        assert_eq!(
            Command::from_str("SET").err().unwrap().to_string().as_str(),
            "(error) ERR wrong number of arguments for SET command"
        );
        assert_eq!(
            Command::from_str("SET cow bessie red")
                .err()
                .unwrap()
                .to_string()
                .as_str(),
            "(error) ERR wrong number of arguments for SET command"
        );
        assert_eq!(
            Command::from_str("JUMP higher")
                .err()
                .unwrap()
                .to_string()
                .as_str(),
            "(error) ERR unknown command"
        );
        assert_eq!(
            Command::from_str("").err().unwrap().to_string().as_str(),
            "(error) ERR no command supplied"
        );
        assert_eq!(
            Command::from_str("SET cow bessie").ok().unwrap(),
            Command::Set("cow".to_string(), "bessie".to_string())
        );
    }
}