~nickbp/tokio-scgi

1c2fda997db455700b4eb7460bbcbb28d49c92de — Nick Parker 2 years ago 91449cf
Move bin/* to example/*, implement example client
6 files changed, 168 insertions(+), 78 deletions(-)

M Cargo.lock
M Cargo.toml
A examples/client.rs
R src/bin/server.rs => examples/server.rs
M src/abortable_stream.rs
D src/bin/client.rs
M Cargo.lock => Cargo.lock +1 -0
@@ 586,6 586,7 @@ dependencies = [
 "proptest 0.9.4 (registry+https://github.com/rust-lang/crates.io-index)",
 "tokio 0.1.21 (registry+https://github.com/rust-lang/crates.io-index)",
 "tokio-codec 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)",
 "tokio-sync 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)",
]

[[package]]

M Cargo.toml => Cargo.toml +2 -1
@@ 11,4 11,5 @@ tokio-codec = "0.1.1"
bytes = "0.4.12"

[dev-dependencies]
proptest = "0.9.4"
\ No newline at end of file
proptest = "0.9.4"
tokio-sync = "0.1.6"
\ No newline at end of file

A examples/client.rs => examples/client.rs +137 -0
@@ 0,0 1,137 @@
#![deny(warnings, rust_2018_idioms)]

use bytes::{BufMut, BytesMut};
use std::env;
use std::io::{Error, ErrorKind};
use std::net::ToSocketAddrs;
use std::path::Path;
use tokio::net::{TcpStream, UnixStream};
use tokio::prelude::*;
use tokio_codec::Framed;
use tokio_scgi::client::{SCGICodec, SCGIRequest};
use tokio_sync::oneshot;
use tokio_sync::oneshot::Sender;

fn syntax() -> Error {
    println!(
        "Syntax: {} </path/to/unix.sock or tcp-host:1234>",
        env::args().nth(0).unwrap()
    );
    Error::new(ErrorKind::InvalidInput, "Missing required argument")
}

fn main() -> Result<(), Error> {
    if env::args().len() <= 1 {
        return Err(syntax());
    }
    let endpoint = env::args().nth(1).unwrap();
    if endpoint.starts_with('-') {
        // Probably a commandline argument like '-h'/'--help', avoid parsing as a hostname
        return Err(syntax());
    }

    let (sender, receiver) = oneshot::channel::<Option<BytesMut>>();
    if endpoint.contains('/') {
        // Probably a path to a file, assume the argument is a unix socket
        let addr = Path::new(&endpoint);
        println!("Connecting to {}", addr.display());
        connect(UnixStream::connect(&addr), sender);
    } else {
        // Probably a TCP endpoint, try to resolve it in case it's a hostname
        let addr = endpoint
            .to_socket_addrs()
            .expect(format!("Invalid TCP endpoint '{}'", endpoint).as_str())
            .next()
            .unwrap();
        println!("Connecting to {}", addr);
        connect(TcpStream::connect(&addr), sender);
    }

    match receiver.wait() {
        Ok(Some(response)) => {
            println!("response = {:?}", response);
            Ok(())
        }
        Ok(None) => Err(Error::new(ErrorKind::Other, "No response received")),
        Err(e) => Err(Error::new(
            ErrorKind::Other,
            format!("Error when waiting for query result: {}", e),
        )),
    }
}

/// Schedules a `send()` call to be triggered after the connection is made.
fn connect<C, F>(connect_future: F, output: Sender<Option<BytesMut>>)
where
    C: AsyncRead + AsyncWrite + std::marker::Send + std::fmt::Debug + 'static,
    F: Future<Item = C, Error = Error> + std::marker::Send + 'static,
{
    let cb = connect_future
        .map_err(|e| {
            println!("connect error = {:?}", e);
            //output.send(None);
        })
        .and_then(move |conn| {
            send(conn, output);
            Ok(())
        });
    // The first one in the chain must use tokio::run.
    // tokio::spawn can only be called inside the runtime.
    tokio::run(cb);
}

/// Schedules sending the request payload. Once the send is complete, `recv()` is called for
/// handling the response.
fn send<C>(conn: C, output: Sender<Option<BytesMut>>)
where
    C: AsyncRead + AsyncWrite + std::marker::Send + std::fmt::Debug + 'static,
{
    let (tx_scgi, rx_scgi) = Framed::new(conn, SCGICodec::new()).split();
    let cb = tx_scgi
        .send(build_request())
        .map_err(|e| {
            println!("send error = {:?}", e);
            //output.send(None);
        })
        .and_then(move |_| {
            recv(rx_scgi, output);
            Ok(())
        });
    tokio::spawn(cb);
}

/// Schedules receiving the response. In this demo the response is printed to the console.
fn recv<R>(rx_scgi: R, output: Sender<Option<BytesMut>>)
where
    R: Stream<Item = BytesMut, Error = Error> + std::marker::Send + std::fmt::Debug + 'static,
{
    // TODO repeatedly recv until disconnected by server?
    let cb = rx_scgi
        .into_future()
        .map_err(|e| {
            println!("recv error = {:?}", e);
            //output.send(None);
        })
        .and_then(move |(response, _stream)| {
            match output.send(response) {
                Ok(()) => println!("Sent response"),
                Err(_response) => println!("Failed to send response"),
            }
            Ok(())
        });
    tokio::spawn(cb);
}

fn build_request() -> SCGIRequest {
    let content_str = b"{\"description\": \"my name is also bort\"}";
    let mut content = BytesMut::with_capacity(content_str.len());
    content.put(content_str.to_vec());

    let mut headers = Vec::new();
    headers.push(("Content-Length".to_string(), content_str.len().to_string()));
    headers.push(("SCGI".to_string(), "1".to_string()));
    headers.push(("Content-Type".to_string(), "application/json".to_string()));
    headers.push(("X-Username".to_string(), "bort".to_string()));

    SCGIRequest::Request(headers, content)
}

R src/bin/server.rs => examples/server.rs +21 -13
@@ 29,8 29,10 @@ fn main() -> Result<(), Error> {
    let endpoint = env::args().nth(1).unwrap();
    if endpoint.starts_with('-') {
        // Probably a commandline argument like '-h'/'--help', avoid parsing as a hostname
        Err(syntax())
    } else if endpoint.contains('/') {
        return Err(syntax());
    }

    if endpoint.contains('/') {
        // Probably a path to a file, assume the argument is a unix socket
        tokio::run(
            unix_init(endpoint)?


@@ 38,28 40,21 @@ fn main() -> Result<(), Error> {
                .map_err(|e| println!("Unix socket failed: {:?}", e))
                .for_each(|conn| serve(conn)),
        );
        Ok(())
    } else {
        // Probably a TCP endpoint, try to resolve it in case it's a hostname
        let addr = endpoint
            .to_socket_addrs()
            .expect(format!("Invalid TCP endpoint '{}'", endpoint).as_str())
            .next()
            .unwrap();
        println!("Listening on {}", addr);
        tokio::run(
            TcpListener::bind(&addr)?
            tcp_init(endpoint)?
                .incoming()
                .map_err(|e| println!("TCP socket failed: {:?}", e))
                .for_each(|conn| serve(conn)),
        );
        Ok(())
    }
    Ok(())
}

fn unix_init(path_str: String) -> Result<UnixListener, Error> {
    // Try to delete the socket file. Avoids AddrInUse errors. No-op if already missing.
    let path = Path::new(&path_str);
    // Try to delete the socket file. Avoids AddrInUse errors. No-op if already missing.
    fs::remove_file(path)
        .and_then(|()| {
            println!("Deleted existing {}", path.display());


@@ 87,6 82,19 @@ fn unix_init(path_str: String) -> Result<UnixListener, Error> {
    Ok(socket)
}

fn tcp_init(endpoint_str: String) -> Result<TcpListener, Error> {
    let addr = endpoint_str
        .to_socket_addrs()
        .expect(format!("Invalid TCP endpoint '{}'", endpoint_str).as_str())
        .next()
        .unwrap();

    let socket = TcpListener::bind(&addr)?;
    println!("Listening on {}", addr);

    Ok(socket)
}

macro_rules! http_response {
    ($response_code:expr, $content_type:expr, $content:expr) => {
        format!(


@@ 123,7 131,7 @@ where
    //     has effectively said there's nothing left to be read from there.
    // 7b. If Continue was returned, rx_scgi is queried for more data and the cycle continues.
    let session = tx_scgi
        .send_all(AbortableStream::new_err(
        .send_all(AbortableStream::with_err_conv(
            rx_scgi.and_then(move |request| match handler.handle(request) {
                Ok(r) => Ok(r),
                Err(e) => Ok(AbortableItem::Stop(handle_error(e))),

M src/abortable_stream.rs => src/abortable_stream.rs +7 -7
@@ 17,17 17,17 @@ pub enum AbortableItem<T> {
/// on the next poll.
pub struct AbortableStream<S, T, E> {
    stream: S,
    err_handler: Option<fn(E) -> Option<T>>,
    err_conv: Option<fn(E) -> Option<T>>,
    stop: bool,
}

impl<S, T, E> AbortableStream<S, T, E> {
    /// Creates a new instance, wrapping the provided stream and using the provided callback to
    /// convert errors before outputting them.
    pub fn new_err(stream: S, err_handler: fn(E) -> Option<T>) -> AbortableStream<S, T, E> {
    pub fn with_err_conv(stream: S, err_conv: fn(E) -> Option<T>) -> AbortableStream<S, T, E> {
        AbortableStream {
            stream,
            err_handler: Some(err_handler),
            err_conv: Some(err_conv),
            stop: false,
        }
    }


@@ 37,7 37,7 @@ impl<S, T, E> AbortableStream<S, T, E> {
    pub fn new(stream: S) -> AbortableStream<S, T, E> {
        AbortableStream {
            stream,
            err_handler: None,
            err_conv: None,
            stop: false,
        }
    }


@@ 66,9 66,9 @@ where
            Ok(Async::Ready(None)) => Ok(Async::Ready(None)),
            Ok(Async::NotReady) => Ok(Async::NotReady),
            Err(err) => {
                // Use custom handler, or fall back to just forwarding the error as-is.
                match self.err_handler {
                    Some(err_handler) => Ok(Async::Ready(err_handler(err))),
                // Use error converter, if provided.
                match self.err_conv {
                    Some(err_conv) => Ok(Async::Ready(err_conv(err))),
                    None => Err(err),
                }
            }

D src/bin/client.rs => src/bin/client.rs +0 -57
@@ 1,57 0,0 @@
#![deny(warnings, rust_2018_idioms)]

fn main() -> () {
    println!("hi")
}

// TODO just write a stdlib sync version, to show tokio usage isnt required
/*
use std::env;
use std::io::Error;
use std::path::Path;
use tokio;
use tokio::net::{TcpStream, UnixStream};
use tokio::prelude::*;
use tokio_codec::Framed;
use std::net::SocketAddr;
use std::str::FromStr;
use tokio_scgi::{SCGICodec, SCGIRequest};

fn main() -> Result<(), Error> {
    let endpoint = env::args().nth(1).unwrap_or("/tmp/scgi.sock".to_string());

    let mut headers = Vec::new();
    headers.push(("USERNAME".to_string(), "bort".to_string()));

    let mut sendme = Vec::new();
    sendme.push(SCGIRequest::Headers(headers));
    sendme.push(SCGIRequest::BodyFragment("my name is also bort".to_string().into_bytes()));

    if endpoint.contains('/') {
        // Probably a path to a file
        let path = Path::new(&endpoint);
        println!("Connecting to Unix {}", path.display());
        send_request(UnixStream::connect(&path), sendme);
        Ok(())
    } else {
        // Probably a TCP endpoint
        let addr = SocketAddr::from_str(endpoint.as_str())
            .expect(format!("Invalid endpoint: {}", endpoint).as_str());
        println!("Connecting to TCP {}", addr);
        send_request(TcpStream::connect(&addr), sendme);
        Ok(())
    }
}

fn send_request<T: Future + Stream>(foo: T, sendme: Vec<SCGIRequest>) {
    foo.and_then(|stream| {
        Framed::new(stream, SCGICodec::new()).send(sendme.get(0).expect("a").clone())
        //.send_all(stream::iter_ok(sendme))
    }).and_then(|result| {
        println!("Send result: {:?}", result);
        Ok(())
    }).map_err(|err| {
        println!("Something failed: {:?}", err);
    }).poll().expect("c");
}
*/