~nickbp/kapiti

8b9a78699f4881d88045a41dc1f647f983564889 — Nick Parker 6 months ago 3d30283
More cleanup and a rust compiler crash
3 files changed, 85 insertions(+), 68 deletions(-)

M src/filter/downloader.rs
M src/filter/filter.rs
M src/runner.rs
M src/filter/downloader.rs => src/filter/downloader.rs +5 -3
@@ 20,13 20,14 @@ use crate::hyper_smol;
/// If the local path exists and has an mtime newer than the URL, the download is skipped.
/// The client uses the external resolver, ensuring that the query is NOT affected by local filters.
/// The path meanwhile must have a ".zstd" extension or an error will be returned.
/// Returns a boolean for whether the file was actually uploaded (true) or skipped (false)
pub async fn update_file(
    client: &Client<hyper_smol::SmolConnector>,
    fetcher: &Fetcher,
    url: &String,
    path: &Path,
    timeout_ms: u64,
) -> Result<()> {
) -> Result<bool> {
    let file_mtime = get_file_mtime_ms(path)
        .with_context(|| format!("Failed to check local copy of {} at {:?}", url, path))?;
    let (head_redirect_url, needs_update) = match file_mtime {


@@ 43,7 44,7 @@ pub async fn update_file(
    if !needs_update {
        // File exists and is up to date
        info!("Skipping download of {}: Local copy is up to date", url);
        return Ok(());
        return Ok(false);
    }

    // File doesn't exist, or file is out of date. Get a new version.


@@ 116,7 117,8 @@ pub async fn update_file(
            "Failed to rename downloaded filter file from {:?} to {:?}",
            tmp_path, path
        )
    })
    })?;
    Ok(true)
}

async fn file_needs_update(

M src/filter/filter.rs => src/filter/filter.rs +26 -32
@@ 1,4 1,3 @@
use std::convert::TryFrom;
use std::path::{Path, PathBuf};
use std::vec::Vec;



@@ 109,44 108,39 @@ impl Filter {
    }
}

/// Returns the local path where the file was downloaded,
/// and whether the file was updated (true) or the update was skipped (false)
pub async fn update_url(
    fetch_client: &Client<hyper_smol::SmolConnector>,
    filters_dir: &PathBuf,
    hosts_path: &String,
    uri_string: &String,
    timeout_ms: u64,
) -> Result<String> {
    if let Ok(host_uri) = Uri::try_from(hosts_path) {
        // Parsed as a URL, try to download
        if host_uri.scheme() == None {
            // Filesystem paths can get parsed as URLs with no scheme
            return Ok(hosts_path.to_string());
        }
        let fetcher = http::Fetcher::new(10 * 1024 * 1024, None);
        // We download files to the exact SHA of the URL string we were provided.
        // This is an easy way to avoid filename collisions in URLs: example1.com/hosts vs example2.com/hosts
        // If the user changes the URL string then that changes the SHA, perfect for "cache invalidation" purposes.
        let hosts_path_sha = Sha256::digest(hosts_path.as_bytes());
        let download_path = Path::new(filters_dir).join(format!(
            "{:x}.sha256.{}",
            hosts_path_sha,
            path::ZSTD_EXTENSION
        ));
        downloader::update_file(
            fetch_client,
            &fetcher,
            &hosts_path.to_string(),
            download_path.as_path(),
            timeout_ms,
        )
) -> Result<(String, bool)> {
    let fetcher = http::Fetcher::new(10 * 1024 * 1024, None);
    // We download files to the exact SHA of the URL string we were provided.
    // This is an easy way to avoid filename collisions in URLs: example1.com/hosts vs example2.com/hosts
    // If the user changes the URL string then that changes the SHA, perfect for "cache invalidation" purposes.
    let hosts_path_sha = Sha256::digest(uri_string.as_bytes());
    let download_path = Path::new(filters_dir).join(format!(
        "{:x}.sha256.{}",
        hosts_path_sha,
        path::ZSTD_EXTENSION
    ));
    let downloaded = downloader::update_file(
        fetch_client,
        &fetcher,
        uri_string,
        download_path.as_path(),
        timeout_ms,
    )
        .await?;
        Ok(download_path
    Ok((
        download_path
            .to_str()
            .with_context(|| format!("busted download path: {:?}", download_path))?
            .to_string())
    } else {
        // Couldn't parse as URL, assume it's a local file
        Ok(hosts_path.to_string())
    }
        .to_string(),
        downloaded
    ))
}

fn upsert_entries(entries: &mut Vec<reader::FilterEntries>, new_entry: reader::FilterEntries) {

M src/runner.rs => src/runner.rs +54 -33
@@ 1,4 1,5 @@
use std::collections::HashMap;
use std::convert::TryFrom;
use std::fs;
use std::net::{SocketAddr, ToSocketAddrs};
use std::path::PathBuf;


@@ 10,7 11,7 @@ use anyhow::{bail, Context, Result};
use async_lock::Barrier;
use async_net::{TcpListener, TcpStream, UdpSocket};
use bytes::BytesMut;
use hyper::Client;
use hyper::{Client, Uri};
use smol::Task;
use tracing::{self, debug, info, warn};



@@ 189,7 190,7 @@ impl Runner {
                            let span = tracing::info_span!("update-filters");
                            let _enter = span.enter();
                            loop {
                                refresh_remote_filters(&filters_dir, &filter_copy, &filters_copy, &fetch_client).await;
                                refresh_filters(&filters_dir, &filter_copy, &filters_copy, &fetch_client).await;
                                if filter_refresh.as_secs() == 0 {
                                    info!("Exiting filter refresh: filter_refresh_seconds=0");
                                    break;


@@ 266,43 267,24 @@ impl Runner {

/// Sets up a thread for periodically reloading filters.
/// Avoids using an async task because they dislike interacting with mutexes.
async fn refresh_remote_filters(filters_dir: &PathBuf, filter: &Arc<Mutex<filter::Filter>>, filter_configs: &HashMap<String, config::ConfigFilter>, fetch_client: &Client<hyper_smol::SmolConnector>) {
async fn refresh_filters(
    filters_dir: &PathBuf,
    filter: &Arc<Mutex<filter::Filter>>,
    filter_configs: &HashMap<String, config::ConfigFilter>,
    fetch_client: &Client<hyper_smol::SmolConnector>
) {
    for (_name, conf) in filter_configs {
        let mut filters = Vec::new();
        // For each override/block, check the URL (or do nothing for a local path)
        // Then re-read the result from disk.
        for entry in &conf.overrides {
            if let Ok(download_path_str) = filter::update_url(
                fetch_client,
                filters_dir,
                entry,
                10000,
            ).await {
                if let Ok(filter) = reader::read(
                    reader::FilterType::OVERRIDE,
                    reader::FileInfo {
                        source_path: entry.clone(),
                        local_path: download_path_str
                    }
                ) {
                    filters.push(filter);
                }
            if let Some(filter) = refresh_filter(fetch_client, filters_dir, entry, reader::FilterType::OVERRIDE).await {
                filters.push(filter);
            }
        }
        for entry in &conf.blocks {
            if let Ok(download_path_str) = filter::update_url(
                fetch_client,
                filters_dir,
                entry,
                10000,
            ).await {
                if let Ok(filter) = reader::read(
                    reader::FilterType::BLOCK,
                    reader::FileInfo {
                        source_path: entry.clone(),
                        local_path: download_path_str,
                    }
                ) {
                    filters.push(filter);
                }
            if let Some(filter) = refresh_filter(fetch_client, filters_dir, entry, reader::FilterType::BLOCK).await {
                filters.push(filter);
            }
        }
        if let Ok(mut filter_locked) = filter.lock() {


@@ 313,6 295,45 @@ async fn refresh_remote_filters(filters_dir: &PathBuf, filter: &Arc<Mutex<filter
    }
}

async fn refresh_filter(
    fetch_client: &Client<hyper_smol::SmolConnector>,
    filters_dir: &PathBuf,
    filter_path_or_url: &String,
    filter_type: reader::FilterType,
) -> Option<reader::FilterEntries> {
    if let Ok(filter_uri) = Uri::try_from(filter_path_or_url) {
        // Parsed as a URL, try to download
        // Filesystem paths can get parsed as URLs with no scheme
        // COMPILER CRASH!:
        if filter_uri.scheme() != None && let Ok((local_path, _downloaded)) = filter::update_url(
            fetch_client,
            filters_dir,
            filter_path_or_url,
            10000,
        ).await {
            if let Ok(filter) = reader::read(
                filter_type,
                reader::FileInfo {
                    source_path: filter_path_or_url.clone(),
                    local_path
                }
            ) {
                return Some(filter);
            }
        }
        return None
    } else if let Ok(filter) = reader::read(
        filter_type,
        reader::FileInfo {
            source_path: filter_path_or_url.clone(),
            local_path: filter_path_or_url.clone(),
        }
    ) {
        return Some(filter);
    }
    None
}

async fn handle_next_request(
    server_query_rx: &Mutex<async_channel::Receiver<RequestMsg>>,
    lookup: &mut lookup::Lookup,