~nickbp/originz

ref: fbaed2a25114cf06aaa5d509c0d19d28ac4faa6d originz/src/filter/downloader.rs -rw-r--r-- 11.3 KiB
fbaed2a2Nick Parker Implement benchmark test for UDP client/UDP upstream (#10) 1 year, 10 months ago
                                                                                
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
#![deny(warnings, rust_2018_idioms)]

use std::convert::TryFrom;
use std::fs::{self, File};
use std::io;
use std::path::Path;
use std::time::SystemTime;

use anyhow::{anyhow, Context, Result};
use chrono::{DateTime, NaiveDateTime};
use hyper::client::HttpConnector;
use hyper::header;
use hyper::{Body, Client, HeaderMap, Method};
use hyper_rustls::HttpsConnector;
use tracing::{debug, info, level_enabled, trace, warn, Level};

use crate::client::hyper::Resolver;
use crate::filter::path;
use crate::http::Fetcher;

/// Downloads the specified URL to the specified path using the provided client.
/// If the local path exists and has an mtime newer than the URL, the download is skipped.
/// The client uses the external resolver, ensuring that the query is NOT affected by local filters.
/// The path meanwhile must have a ".zstd" extension or an error will be returned.
pub async fn update_file(
    client: &Client<HttpsConnector<HttpConnector<Resolver>>, Body>,
    fetcher: &Fetcher,
    url: &String,
    path: &Path,
) -> Result<()> {
    let file_mtime = get_file_mtime_ms(path)
        .with_context(|| format!("Failed to check local copy of {} at {:?}", url, path))?;
    let (head_redirect_url, needs_update) = match file_mtime {
        Some(file_mtime_ms_u128) => {
            let file_mtime_ms = i64::try_from(file_mtime_ms_u128)
                .with_context(|| format!("Invalid file mtime for {:?}", path))?;
            file_needs_update(client, fetcher, url, file_mtime_ms).await?
        }
        None => {
            // Local file not found, do download
            (None, true)
        }
    };
    if !needs_update {
        // File exists and is up to date
        info!("Skipping download of {}: Local copy is up to date", url);
        return Ok(());
    }

    // File doesn't exist, or file is out of date. Get a new version.
    info!("Downloading {} to {:?}", url, path);

    let get_url = match &head_redirect_url {
        // If the HEAD query hit a redirect, follow that same redirect for the following GET query
        Some(u) => u,
        // Otherwise use the original URL for the GET query
        None => url,
    };

    let mut resp = client
        .request(fetcher.build_request(&Method::GET, get_url)?)
        .await
        .with_context(|| format!("HTTP GET to {} failed", get_url))?;

    // Only allow redirect for GET query if we didn't already follow a redirect for the HEAD query.
    if head_redirect_url.is_none() && resp.status().is_redirection() {
        // Basic support for redirects: Just allow at most one redirect, and don't change request content for the new destination.
        // Intentionally basic for now, can improve later if needed.
        let loc = header_to_str(resp.headers(), &header::LOCATION, url)?;
        trace!("Following redirect: {} => {}", url, loc);
        resp = client
            .request(fetcher.build_request(&Method::GET, &loc)?)
            .await
            .with_context(|| format!("HTTP GET to {} failed", loc))?;
    }

    // Note that we just pass the original url: log the original requested value rather than the redirected value

    // Write to "file.tmp" then rename to "file[.ext]"
    let tmp_path = path.with_extension("tmp");
    // If anything fails with the download then try to delete "file.tmp" automatically.
    let _tmp_guard = scopeguard::guard(tmp_path.clone(), |path| {
        trace!("Cleaning up {:?} if it exists", path);
        let _ = fs::remove_file(path);
    });
    trace!("Downloading to {:?}", tmp_path);

    {
        let mut tmp_file = File::create(&tmp_path)?;
        if path::is_zstd_extension(path) {
            // ZSTD compression for file output
            let mut encoder =
                zstd::stream::Encoder::new(tmp_file, zstd::DEFAULT_COMPRESSION_LEVEL)?
                    .auto_finish();
            fetcher
                .write_response(&url, &mut encoder, &mut resp)
                .await?;
        } else {
            // No compression for file output
            fetcher
                .write_response(&url, &mut tmp_file, &mut resp)
                .await?;
        }
    }

    trace!("Renaming {:?} => {:?}", tmp_path, path);
    fs::rename(&tmp_path, &path).with_context(|| {
        format!(
            "Failed to rename downloaded filter file from {:?} to {:?}",
            tmp_path, path
        )
    })
}

async fn file_needs_update(
    client: &Client<HttpsConnector<HttpConnector<Resolver>>, Body>,
    fetcher: &Fetcher,
    url: &String,
    file_mtime_ms: i64,
) -> Result<(Option<String>, bool)> {
    let mut redirect_url: Option<String> = None;

    // Local file already exists, see if it makes sense to update.
    // Check the server's Last-Modified response and compare it to our mtime.
    // We avoid dealing with If-Modified-Since, since support for it is apparently very inconsistent.
    // see also: https://developer.mozilla.org/en-US/docs/Web/HTTP/Methods/HEAD
    let mut head_resp = client
        .request(fetcher.build_request(&Method::HEAD, &url)?)
        .await
        .with_context(|| format!("HTTP HEAD query to {} failed", url))?;

    if head_resp.status().is_redirection() {
        // Basic support for redirects: Just allow at most one redirect, and don't change request content for the new destination.
        // Intentionally basic for now, can improve later if needed.
        let loc = header_to_str(head_resp.headers(), &header::LOCATION, &url)?;
        head_resp = client
            .request(fetcher.build_request(&Method::GET, &loc)?)
            .await
            .with_context(|| format!("HTTP HEAD query to {} failed", loc))?;
        redirect_url = Some(loc);
    }

    // Check if local file is older than server's Last-Modified
    match header_to_str(head_resp.headers(), &header::LAST_MODIFIED, &url) {
        Ok(url_mtime_header) => {
            if level_enabled!(Level::DEBUG) {
                debug!(
                    "Existing file mtime='{}' vs {} Last-Modified='{}'",
                    NaiveDateTime::from_timestamp(file_mtime_ms / 1000, 0),
                    url,
                    url_mtime_header
                );
            }
            let url_mtime_ms = DateTime::parse_from_rfc2822(url_mtime_header.as_str())
                .with_context(|| format!("Failed to parse Last-Modified header from {}", url))?
                .timestamp_millis();
            return Ok((redirect_url, file_mtime_ms < url_mtime_ms));
        }
        Err(_e) => {
            // No Last-Modified, continue below...
        }
    };

    // Check if local file is older than the server's expire period defined by Date+Expires.
    // (Seen with downloads from raw.githubusercontent.com)
    match (
        header_to_str(head_resp.headers(), &header::DATE, &url),
        header_to_str(head_resp.headers(), &header::EXPIRES, &url),
    ) {
        (Ok(url_date_header), Ok(url_expires_header)) => {
            if level_enabled!(Level::DEBUG) {
                debug!(
                    "Existing file mtime='{}' vs {} Date='{}' + Expires='{}'",
                    NaiveDateTime::from_timestamp(file_mtime_ms / 1000, 0),
                    url,
                    url_date_header,
                    url_expires_header
                );
            }
            let url_date_ms = DateTime::parse_from_rfc2822(url_date_header.as_str())
                .with_context(|| format!("Failed to parse Date header from {}", url))?
                .timestamp_millis();
            let url_expires_ms = DateTime::parse_from_rfc2822(url_expires_header.as_str())
                .with_context(|| format!("Failed to parse Expires header from {}", url))?
                .timestamp_millis();
            if url_date_ms > url_expires_ms {
                // The server's Date timestamp is ahead of their own Expires timestamp.
                // Give up and redownload the file.
                warn!(
                    "Server Date={}/{} is older than server Expires={}/{}",
                    url_date_ms, url_date_header, url_expires_ms, url_expires_header
                );
                return Ok((redirect_url, true));
            }

            // We want to download if the file is older than the difference of Expires-Date (aka the expire duration)
            // For example, if the expire duration is 5 hours, then we should only download if the local file is more than 5 hours old.
            // This allows us to avoid full parsing support of Cache-Control headers.
            // Also, we are careful to avoid comparing server timestamps against local timestamps. We only compare relative durations.
            match SystemTime::now().duration_since(SystemTime::UNIX_EPOCH) {
                Ok(epoch_duration) => {
                    let now_ms = i64::try_from(epoch_duration.as_millis())
                        .with_context(|| "current time is invalid")?;
                    if now_ms < file_mtime_ms {
                        // Give up and redownload the file.
                        warn!(
                            "File was modified in the future: mtime={} now={}",
                            file_mtime_ms, now_ms
                        );
                        return Ok((redirect_url, true));
                    }
                    let expire_duration_ms = url_expires_ms - url_date_ms;
                    let file_age_ms = now_ms - file_mtime_ms;
                    debug!(
                        "File age {} vs expire duration {}",
                        file_age_ms, expire_duration_ms
                    );
                    Ok((redirect_url, file_age_ms > expire_duration_ms))
                }
                Err(_) => {
                    // Give up and redownload the file.
                    warn!("Current time is before 1970");
                    Ok((redirect_url, true))
                }
            }
        }
        _ => {
            // Missing Last-Modified and missing either Date and/or Expires. Give up and do the download.
            Ok((redirect_url, true))
        }
    }
}

fn get_file_mtime_ms(path: &Path) -> Result<Option<u128>> {
    match fs::metadata(path) {
        Ok(metadata) => {
            let mtime = metadata
                .modified()
                .with_context(|| format!("Failed to get modified time for {:?}", path))?;
            match mtime.duration_since(SystemTime::UNIX_EPOCH) {
                Ok(duration) => Ok(Some(duration.as_millis())),
                Err(_) => {
                    // mtime is before epoch, lets just treat it as being created AT epoch
                    warn!("File was created before 1970: {:?}", path);
                    Ok(Some(0))
                }
            }
        }
        Err(e) => {
            if e.kind() != io::ErrorKind::NotFound {
                Err(e).with_context(|| format!("Failed to get metadata for {:?}", path))?;
            }
            Ok(None)
        }
    }
}

fn header_to_str(
    headers: &HeaderMap,
    header: &header::HeaderName,
    origin: &String,
) -> Result<String> {
    match headers.get(header) {
        Some(header_val) => match header_val.to_str() {
            Ok(header_str) => Ok(header_str.to_string()),
            Err(e) => Err(anyhow!(
                "Failed to convert {} {:?} to string: {:?}",
                origin,
                header,
                e
            )),
        },
        None => Err(anyhow!(
            "{} response has missing {:?}: {:?}",
            origin,
            header,
            headers
        )),
    }
}