~nickbp/tokio-scgi

ref: 1c2fda997db455700b4eb7460bbcbb28d49c92de tokio-scgi/src/abortable_stream.rs -rw-r--r-- 2.4 KiB
1c2fda99Nick Parker Move bin/* to example/*, implement example client 2 years ago
                                                                                
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
#![deny(warnings, rust_2018_idioms)]

use tokio::prelude::*;

/// Type to be returned by the wrapped Stream. This tells the AbortableStream when it should avoid
/// making any additional calls to the underlying wrapped Stream.
#[derive(Clone, Debug, Eq, PartialEq)]
pub enum AbortableItem<T> {
    /// Continue reading after this item
    Continue(T),

    /// Stop reading after this item
    Stop(T),
}

/// Wraps an underlying stream, looking for a Stop value. When Stop is observed, it will return None
/// on the next poll.
pub struct AbortableStream<S, T, E> {
    stream: S,
    err_conv: Option<fn(E) -> Option<T>>,
    stop: bool,
}

impl<S, T, E> AbortableStream<S, T, E> {
    /// Creates a new instance, wrapping the provided stream and using the provided callback to
    /// convert errors before outputting them.
    pub fn with_err_conv(stream: S, err_conv: fn(E) -> Option<T>) -> AbortableStream<S, T, E> {
        AbortableStream {
            stream,
            err_conv: Some(err_conv),
            stop: false,
        }
    }

    /// Creates a new instance, wrapping the provided stream and passing through received errors
    /// directly.
    pub fn new(stream: S) -> AbortableStream<S, T, E> {
        AbortableStream {
            stream,
            err_conv: None,
            stop: false,
        }
    }
}

impl<S, T, E> Stream for AbortableStream<S, T, E>
where
    S: Stream<Item = AbortableItem<T>, Error = E>,
{
    type Item = T;
    type Error = E;

    fn poll(&mut self) -> Poll<Option<T>, Self::Error> {
        if self.stop {
            // Do not read from the wrapped stream, just exit.
            return Ok(Async::Ready(None));
        }
        match self.stream.poll() {
            // Interpret AbortableItem flag:
            Ok(Async::Ready(Some(AbortableItem::Continue(item)))) => Ok(Async::Ready(Some(item))),
            Ok(Async::Ready(Some(AbortableItem::Stop(item)))) => {
                self.stop = true;
                Ok(Async::Ready(Some(item)))
            }
            // Passthroughs:
            Ok(Async::Ready(None)) => Ok(Async::Ready(None)),
            Ok(Async::NotReady) => Ok(Async::NotReady),
            Err(err) => {
                // Use error converter, if provided.
                match self.err_conv {
                    Some(err_conv) => Ok(Async::Ready(err_conv(err))),
                    None => Err(err),
                }
            }
        }
    }
}