~nickbp/originz

ref: 41895a73626b9e101ebd940b9de825c18ceb27f1 originz/src/http.rs -rw-r--r-- 9.2 KiB
41895a73Nick Parker Only build binary in docker build 11 months ago
                                                                                
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
use std::convert::TryFrom;
use std::io::{self, Write};

use anyhow::{anyhow, bail, Context, Result};
use flate2::write::GzDecoder;
use http::request::Builder;
use hyper::body::HttpBody;
use hyper::header;
use hyper::{Body, HeaderMap, Method, Request, Response, Uri, Version};
use tracing::trace;

/// Shared code for downloading content from HTTP(s) servers.
/// Adds support for gzipped responses, restricting the download size, and checking matching content type.
pub struct Fetcher {
    max_length_bytes: usize,
    expect_content_type: Option<String>,
    http_version: Version,
}

impl Fetcher {
    pub fn new(max_length_bytes: usize, expect_content_type: Option<String>) -> Fetcher {
        Fetcher {
            max_length_bytes,
            expect_content_type,
            http_version: Version::HTTP_11,
        }
    }

    pub fn use_http_2(mut self) -> Self {
        self.http_version = Version::HTTP_2;
        self
    }

    /// Builds a request that advertises support for gzipped payloads and acceptance of a specified content type.
    pub fn request_builder(&self, method: &Method, url: &Uri) -> Builder {
        let mut builder = Request::builder()
            .version(self.http_version)
            .method(method)
            .uri(url)
            .header(header::ACCEPT_ENCODING, "gzip")
            // Server may complain if we don't specify something
            .header(header::USER_AGENT, "rust/hyper");
        if let Some(expect_content_type) = &self.expect_content_type {
            builder = builder.header(header::ACCEPT, expect_content_type)
        }
        builder
    }

    /// Builds a request that advertises support for gzipped payloads and acceptance of a specified content type.
    pub fn build_request(&self, method: &Method, url: &String) -> Result<Request<Body>> {
        let uri = Uri::try_from(url)?;
        self.request_builder(method, &uri)
            .body(Body::empty())
            .with_context(|| format!("Failed to build {} {} request", method, url))
    }

    /// Writes the payload from the response `resp` to the provided output `out`.
    /// Supports decompressing gzipped payload data, and will enforce any maximum size and/or content type.
    /// `source` can be used for logging the source URL of the response.
    pub async fn write_response<W: Write>(
        &self,
        source: &String,
        out: &mut W,
        resp: &mut Response<Body>,
    ) -> Result<()> {
        if !resp.status().is_success() {
            bail!("{:?} HTTP error: {}", source, resp.status());
        }

        let headers = resp.headers();
        trace!("{} headers: {:?}", source, headers);

        // Servers may not return a Content-Length when gzip is enabled.
        // We enforce size when streaming the body content anyway.
        let content_length_opt = match header_to_str_opt(headers, &header::CONTENT_LENGTH, &source)?
        {
            Some(len_str) => {
                let len = len_str.parse::<usize>().with_context(|| {
                    format!(
                        "{:?} response Content-Length cannot be converted to usize: {}",
                        source, len_str
                    )
                })?;
                // Check advertised size, but verify when reading body.
                if len > self.max_length_bytes {
                    bail!(
                        "{:?} response Content-Length header exceeds maximum {}: {}",
                        source,
                        self.max_length_bytes,
                        len
                    );
                }
                Some(len)
            }
            None => None,
        };

        if let Some(expect_content_type) = &self.expect_content_type {
            let content_type = header_to_str(headers, &header::CONTENT_TYPE, &source)?;
            // Content-Type: "text/csv; charset=UTF-8; header=present"
            if !content_type.starts_with(expect_content_type) {
                bail!(
                    "{:?} has Content-Type {:?}, expected starts_with({})",
                    source,
                    content_type,
                    expect_content_type
                );
            }
        }

        let gzip = match header_to_str_opt(headers, &header::CONTENT_ENCODING, &source)? {
            Some(encoding) => "gzip" == encoding,
            None => {
                // Fall back to Transfer-Encoding
                match header_to_str_opt(headers, &header::TRANSFER_ENCODING, &source)? {
                    Some(encoding) => "gzip" == encoding,
                    None => false,
                }
            }
        };

        if gzip {
            self.write_body_gzip(resp, out, content_length_opt, source)
                .await
        } else {
            self.write_body_plain(resp, out, content_length_opt, source)
                .await
        }
    }

    async fn write_body_gzip<W: Write>(
        &self,
        resp: &mut Response<Body>,
        out: &mut W,
        content_length_opt: Option<usize>,
        source: &String,
    ) -> Result<()> {
        let mut downloaded: usize = 0;
        // Gzip compression for response input
        // Keep track of uncompressed bytes to avoid e.g. a malicious payload filling the disk
        let mut decoder = GzDecoder::new(CountingWriter::new(out));
        while let Some(next) = resp.data().await {
            let chunk = next.with_context(|| format!("{:?} failed to download body", source))?;
            if let Some(content_length) = content_length_opt {
                if downloaded + chunk.len() > content_length {
                    bail!(
                        "{:?} compressed response body length exceeds expected Content-Length {}",
                        source,
                        content_length
                    );
                }
            }
            downloaded += chunk.len();
            trace!(
                "got chunk {} => {}/{:?}",
                chunk.len(),
                downloaded,
                content_length_opt
            );
            // Need to use write_all() to actually flush all the input data
            // In the gzip case, regular write() will just consume some of the input
            // and then it's up to us to loop with the remaining input.
            decoder.write_all(&chunk[..])?;
            // Get the uncompressed size (so far) from our underlying CountingWriter
            if decoder.get_ref().count() > self.max_length_bytes {
                bail!(
                    "{:?} uncompressed response body length exceeds max {}",
                    source,
                    self.max_length_bytes
                );
            }
        }
        decoder.finish()?;
        Ok(())
    }

    async fn write_body_plain<W: Write>(
        &self,
        resp: &mut Response<Body>,
        out: &mut W,
        content_length_opt: Option<usize>,
        source: &String,
    ) -> Result<()> {
        let mut downloaded: usize = 0;
        // No compression for response input
        while let Some(next) = resp.data().await {
            let chunk = next.with_context(|| format!("{:?} failed to download body", source))?;
            if let Some(content_length) = content_length_opt {
                if downloaded + chunk.len() > content_length {
                    bail!(
                        "{:?} response body length exceeds expected Content-Length {}",
                        source,
                        content_length
                    );
                }
            }
            downloaded += chunk.len();
            trace!(
                "Got chunk {} => {}/{:?}",
                chunk.len(),
                downloaded,
                content_length_opt
            );
            out.write(&chunk[..])?;
        }
        Ok(())
    }
}

fn header_to_str(
    headers: &HeaderMap,
    header: &header::HeaderName,
    origin: &String,
) -> Result<String> {
    header_to_str_opt(headers, header, origin)?.with_context(|| {
        format!(
            "{} response has missing {:?}: {:?}",
            origin, header, headers
        )
    })
}

fn header_to_str_opt(
    headers: &HeaderMap,
    header: &header::HeaderName,
    origin: &String,
) -> Result<Option<String>> {
    headers
        .get(header)
        .map_or(Ok(None), |header_val| match header_val.to_str() {
            Ok(header_str) => Ok(Some(header_str.to_string())),
            Err(e) => Err(anyhow!(
                "Failed to convert {} {:?} to string: {:?}",
                origin,
                header,
                e
            )),
        })
}

/// Pass-through writer that counts the number of bytes that have been written.
/// Used to consistently measure the decompressed size of a download.
struct CountingWriter<W: Write> {
    inner: W,
    count: usize,
}

impl<W: Write> CountingWriter<W> {
    fn new(inner: W) -> CountingWriter<W> {
        CountingWriter { inner, count: 0 }
    }

    fn count(&self) -> usize {
        self.count
    }
}

impl<W: Write> Write for CountingWriter<W> {
    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
        let result = self.inner.write(buf);
        if let Ok(_) = result {
            self.count += buf.len();
        }
        return result;
    }

    fn flush(&mut self) -> io::Result<()> {
        self.inner.flush()
    }
}