~nickbp/tokio-scgi

79d639b4e3f903681b4bd29a2ea20481d81e20dc — Nick Parker 2 years ago 7ae8509
Finish example support for repeated reads
2 files changed, 63 insertions(+), 40 deletions(-)

M examples/client.rs
M examples/server.rs
M examples/client.rs => examples/client.rs +30 -16
@@ 51,26 51,40 @@ async fn main() -> Result<(), Error> {
/// Runs the client: Sends a request and prints the responses via the provided UDS or TCP connection.
async fn run_client<C>(conn: &mut C) -> Result<(), Error>
where C: AsyncRead + AsyncWrite + std::marker::Send + std::marker::Unpin + std::fmt::Debug {
    let (mut tx_scgi, rx_scgi) = Framed::new(conn, SCGICodec::new()).split();
    let (mut tx_scgi, mut rx_scgi) = Framed::new(conn, SCGICodec::new()).split();

    // Send request
    tx_scgi.send(build_request()).await?;

    // Consume response
    // TODO support looping over multiple response calls?
    match rx_scgi.into_future().await {
        (None, _) => Err(Error::new(ErrorKind::Other, "No response received")),
        (Some(Err(e)), _) => Err(Error::new(
            ErrorKind::Other,
            format!("Error when waiting for query result: {}", e),
        )),
        (Some(Ok(response)), _) => {
            match String::from_utf8(response.to_vec()) {
                Ok(s) => println!("Got {} bytes:\n{}", response.len(), s),
                Err(e) => println!("{} byte response is not UTF8 ({}):\n{:?}", response.len(), e, response)
            }
            Ok(())
        },
    // Consume response(s): loop until error or empty data returned
    loop {
        match rx_scgi.into_future().await {
            (None, new_rx) => {
                // SCGI response not ready: loop for more rx data
                // Shouldn't happen for response data, but this is how it would work...
                println!("Response data is incomplete, resuming read");
                rx_scgi = new_rx;
            },
            (Some(Err(e)), _new_rx) => {
                // RX error: return error and abort
                return Err(Error::new(
                    ErrorKind::Other,
                    format!("Error when waiting for response: {}", e),
                ));
            },
            (Some(Ok(response)), new_rx) => {
                // Got SCGI response: if empty, treat as end of response.
                if response.len() == 0 {
                    return Ok(());
                }
                // Otherwise 'handle' by printing content, then resume read for more
                rx_scgi = new_rx;
                match String::from_utf8(response.to_vec()) {
                    Ok(s) => println!("Got {} bytes:\n{}", response.len(), s),
                    Err(e) => println!("{} byte response is not UTF8 ({}):\n{:?}", response.len(), e, response)
                }
            },
        }
    }
}


M examples/server.rs => examples/server.rs +33 -24
@@ 119,31 119,40 @@ macro_rules! http_response {
async fn serve<C>(conn: C) -> Result<(), Error>
where C: AsyncRead + AsyncWrite + std::marker::Send + std::marker::Unpin + std::fmt::Debug {
    let mut handler = SampleHandler::new();
    let (mut tx_scgi, rx_scgi) = Framed::new(conn, SCGICodec::new()).split();
    match rx_scgi.into_future().await {
        (None, _new_rx) => {
            // SCGI request not ready: loop for more rx data
            // TODO loop
            tx_scgi.send(handle_error(
                Error::new(ErrorKind::Other, "TODO partial requests aren't supported")
            )).await
        },
        (Some(Err(e)), _new_rx) =>
            // RX error: return error and abort
            Err(Error::new(ErrorKind::Other, format!("Error when waiting for request: {}", e))),
        (Some(Ok(request)), _) =>
            // Got SCGI request: pass to handler
            match handler.handle(request) {
                Ok(Some(r)) =>
                    // Response ready: send and exit
                    tx_scgi.send(r).await,
                Ok(None) =>
                    // Response not ready: loop for more rx data
                    Ok(()), // TODO loop
                Err(e) =>
                    // Handler error: respond with formatted error message
                    tx_scgi.send(handle_error(e)).await,
    let (mut tx_scgi, mut rx_scgi) = Framed::new(conn, SCGICodec::new()).split();

    loop {
        match rx_scgi.into_future().await {
            (None, new_rx) => {
                // SCGI request not ready: loop for more rx data
                println!("Request read returned None, resuming read");
                rx_scgi = new_rx;
            },
            (Some(Err(e)), _new_rx) => {
                // RX error: return error and abort
                return Err(Error::new(
                    ErrorKind::Other,
                    format!("Error when waiting for request: {}", e),
                ));
            },
            (Some(Ok(request)), new_rx) =>
                // Got SCGI request: pass to handler
                match handler.handle(request) {
                    Ok(Some(r)) => {
                        // Response ready: send and exit
                        return tx_scgi.send(r).await;
                    },
                    Ok(None) => {
                        // Response not ready: loop for more rx data
                        println!("Request data is incomplete, resuming read");
                        rx_scgi = new_rx;
                    },
                    Err(e) => {
                        // Handler error: respond with formatted error message
                        return tx_scgi.send(handle_error(e)).await;
                    }
                },
        }
    }
}