~nickbp/kapiti

f6784acfc587a7e7da21d8c82ecc37a262058bc2 — Nick Parker 1 year, 10 months ago 248adad
Fix https filter URLs for some hosts, tidy up error handling/messages
4 files changed, 100 insertions(+), 87 deletions(-)

M src/filter/filter.rs
M src/filter/reader.rs
M src/hyper_smol.rs
M src/runner.rs
M src/filter/filter.rs => src/filter/filter.rs +15 -6
@@ 4,6 4,7 @@ use std::vec::Vec;
use anyhow::{Context, Result};
use hyper::{Client, Uri};
use sha2::{Digest, Sha256};
use tracing::trace;

use crate::filter::{path, reader, updater};
use crate::{fetcher, hyper_smol};


@@ 127,13 128,17 @@ pub async fn update_if_url(
    if let Ok(filter_uri) = Uri::try_from(filter_path_or_url) {
        // Filesystem paths can get parsed as URLs with no scheme
        if filter_uri.scheme() == None {
            trace!(
                "Assuming that no-schema filter path is local: {}",
                filter_path_or_url
            );
            return Ok((
                reader::FileInfo {
                    source_path: filter_path_or_url.clone(),
                    local_path: filter_path_or_url.clone(),
                },
                false
            ))
                false,
            ));
        }

        let fetcher = fetcher::Fetcher::new(10 * 1024 * 1024, None);


@@ 153,25 158,29 @@ pub async fn update_if_url(
            download_path.as_path(),
            timeout_ms,
        )
            .await?;
        .await?;
        Ok((
            reader::FileInfo {
                source_path: filter_path_or_url.clone(),
                local_path: download_path
                    .to_str()
                    .with_context(|| format!("busted download path: {:?}", download_path))?
                    .to_string()
                    .to_string(),
            },
            downloaded,
        ))
    } else {
        trace!(
            "Assuming that non-url filter path is local: {}",
            filter_path_or_url
        );
        return Ok((
            reader::FileInfo {
                source_path: filter_path_or_url.clone(),
                local_path: filter_path_or_url.clone(),
            },
            false
        ))
            false,
        ));
    }
}


M src/filter/reader.rs => src/filter/reader.rs +36 -59
@@ 71,7 71,7 @@ impl FilterContent {
        line_num: usize,
        host: &str,
    ) -> Result<()> {
        let host = validate_host(source_info, line_num, host)?;
        let host = validate_host(host)?;
        self.entries.insert(
            host,
            FilterEntry {


@@ 90,7 90,7 @@ impl FilterContent {
        line_num: usize,
        host: &str,
    ) -> Result<()> {
        let host = validate_host(source_info, line_num, host)?;
        let host = validate_host(host)?;
        self.entries.insert(
            host,
            FilterEntry {


@@ 123,7 123,7 @@ impl FilterContent {
        host: &str,
        dest: Ipv4Addr,
    ) -> Result<()> {
        let host = validate_host(source_info, line_num, host)?;
        let host = validate_host(host)?;
        // If the entry is already present, keep it and just update the ipv4 value.
        // This is mainly for the case of both ipv4+ipv6 entries in the same file.
        let initial_val = FilterEntry {


@@ 149,7 149,7 @@ impl FilterContent {
        host: &str,
        dest: Ipv6Addr,
    ) -> Result<()> {
        let host = validate_host(source_info, line_num, host)?;
        let host = validate_host(host)?;
        // If the entry is already present, keep it and just update the ipv6 value.
        // This is mainly for the case of both ipv4+ipv6 entries in the same file.
        let initial_val = FilterEntry {


@@ 175,14 175,14 @@ pub fn block_hardcoded(block_names: &[&str]) -> Result<FilterFile> {
    for name in block_names {
        content.add_block_hardcoded(name);
    }
    Ok(FilterFile{
    Ok(FilterFile {
        info: None,
        content,
    })
}

/// Reads an override file from disk, returning a parsed list of entries
pub fn read_override(info: FileInfo) -> Result<FilterFile> {
pub fn read_override(info: &FileInfo) -> Result<FilterFile> {
    let path_str = info.local_path.clone();
    let path = Path::new(&path_str);
    let file = File::open(path).with_context(|| format!("Failed to open file {:?}", info))?;


@@ 193,7 193,7 @@ pub fn read_override(info: FileInfo) -> Result<FilterFile> {
    }
}

fn read_override_imp<T: Read>(info: FileInfo, file: T) -> Result<FilterFile> {
fn read_override_imp<T: Read>(info: &FileInfo, file: T) -> Result<FilterFile> {
    let mut reader = BufReader::new(file);
    let mut buf = String::new();
    let mut line_num = 0;


@@ 205,7 205,7 @@ fn read_override_imp<T: Read>(info: FileInfo, file: T) -> Result<FilterFile> {
            .with_context(|| format!("Failed to read file {:?}", info))?;
        if len == 0 {
            // EOF
            return Ok(FilterFile{
            return Ok(FilterFile {
                info: Some(info.clone()),
                content,
            });


@@ 236,12 236,8 @@ fn handle_override_line(
            // Second word present: a hostname for hosts-style (and more hostnames may follow)
            if let Some(_) = first.find(':') {
                // Looks like the IP is IPv6
                let ipv6_dest = Ipv6Addr::from_str(first).with_context(|| {
                    format!(
                        "Failed to parse IPv6 address in {:?} line {}: {}",
                        info, line_num, first
                    )
                })?;
                let ipv6_dest = Ipv6Addr::from_str(first)
                    .with_context(|| format!("Failed to parse IPv6 address: {}", first))?;
                out.add_override_ipv6_line(&info, line_num, second, ipv6_dest)?;
                for word in words {
                    out.add_override_ipv6_line(&info, line_num, word, ipv6_dest)?;


@@ 249,12 245,8 @@ fn handle_override_line(
                Ok(())
            } else {
                // Assume the IP is IPv4
                let ipv4_dest = Ipv4Addr::from_str(first).with_context(|| {
                    format!(
                        "Failed to parse IPv4 address in {:?} line {}: {}",
                        info, line_num, first
                    )
                })?;
                let ipv4_dest = Ipv4Addr::from_str(first)
                    .with_context(|| format!("Failed to parse IPv4 address: {}", first))?;
                out.add_override_ipv4_line(&info, line_num, second, ipv4_dest)?;
                for word in words {
                    out.add_override_ipv4_line(&info, line_num, word, ipv4_dest)?;


@@ 263,12 255,7 @@ fn handle_override_line(
            }
        } else {
            // This is an override file but there's nowhere for the override IP to go
            bail!(
                "Unexpected block-style entry in override rule {:?} line {}: {}",
                info,
                line_num,
                first
            );
            bail!("Unexpected block-style entry in override rule: {}", first);
        }
    } else {
        // Blank line (possibly after stripping any comments)


@@ 277,7 264,7 @@ fn handle_override_line(
}

/// Reads a block file from disk, returning a parsed list of entries
pub fn read_block(info: FileInfo) -> Result<FilterFile> {
pub fn read_block(info: &FileInfo) -> Result<FilterFile> {
    let path_str = info.local_path.clone();
    let path = Path::new(&path_str);
    let file = File::open(path).with_context(|| format!("Failed to open file {:?}", info))?;


@@ 288,7 275,7 @@ pub fn read_block(info: FileInfo) -> Result<FilterFile> {
    }
}

fn read_block_imp<T: Read>(info: FileInfo, file: T) -> Result<FilterFile> {
fn read_block_imp<T: Read>(info: &FileInfo, file: T) -> Result<FilterFile> {
    let mut reader = BufReader::new(file);
    let mut buf = String::new();
    let mut line_num = 0;


@@ 300,7 287,7 @@ fn read_block_imp<T: Read>(info: FileInfo, file: T) -> Result<FilterFile> {
            .with_context(|| format!("Failed to read file {:?}", info))?;
        if len == 0 {
            // EOF
            return Ok(FilterFile{
            return Ok(FilterFile {
                info: Some(info.clone()),
                content,
            });


@@ 330,10 317,10 @@ fn handle_block_line(
        if let Some(second) = words.next() {
            // Second word present: a hostname for hosts-style (and more hostnames may follow)
            // Skip parsing the destination IP and just block the host(s).
            trace!("{} block1:  {}", line_num, first);
            //trace!("{} block1:  {}", line_num, first);
            out.add_block_line(&info, line_num, second)?;
            for word in words {
                trace!("{} blockN:  {}", line_num, first);
                //trace!("{} blockN:  {}", line_num, first);
                out.add_block_line(&info, line_num, word)?;
            }
            Ok(())


@@ 351,7 338,7 @@ fn handle_block_line(
                let first_trim = first
                    .trim_start_matches(|c| char::is_ascii_punctuation(&c))
                    .trim_end_matches(|c| char::is_ascii_punctuation(&c));
                trace!("{} allow:   {} -> {}", line_num, first, first_trim);
                //trace!("{} allow:   {} -> {}", line_num, first, first_trim);
                out.add_allow_line(&info, line_num, first_trim)?;
            } else if first.starts_with("||") || first.starts_with("://") || first.ends_with("^") {
                // Looks like an adblock 'block' rule (frequently malformed):


@@ 367,12 354,12 @@ fn handle_block_line(
                let mut first_trim = first;
                if first_trim.ends_with("$important") {
                    // remove any '$important' from end before trimming any punctuation
                    first_trim = &first_trim[0..first.len()-10];
                    first_trim = &first_trim[0..first.len() - 10];
                }
                first_trim = first_trim
                    .trim_start_matches(|c| char::is_ascii_punctuation(&c))
                    .trim_end_matches(|c| char::is_ascii_punctuation(&c));
                trace!("{} adblock: {} -> {}", line_num, first, first_trim);
                //trace!("{} adblock: {} -> {}", line_num, first, first_trim);
                out.add_block_line(&info, line_num, &first_trim)?;
            } else {
                // Give up and assume that it's just a standalone hostname: '<host>'


@@ 397,7 384,7 @@ fn tokenize(line: &str) -> SplitAsciiWhitespace {
            } else {
                line[..comment_start_hash].split_ascii_whitespace()
            }
        },
        }
        (None, Some(comment_start)) => line[..comment_start].split_ascii_whitespace(),
        (Some(comment_start), None) => line[..comment_start].split_ascii_whitespace(),
        (None, None) => line.split_ascii_whitespace(),


@@ 410,43 397,27 @@ fn tokenize(line: &str) -> SplitAsciiWhitespace {
/// HOWEVER, in reality both of these seem to be valid:
/// - hostnames that start with a number (in the subdomain)
/// - hostnames that contain '_'
fn validate_host(source_info: &FileInfo, line_num: usize, host: &str) -> Result<String> {
fn validate_host(host: &str) -> Result<String> {
    if host.len() < 2 {
        bail!(
            "Invalid host of length {} in {:?} line {}: {}",
            host.len(),
            source_info,
            line_num,
            host
        );
        bail!("Invalid host of length {}: {}", host.len(), host);
    }
    if host.len() > 253 {
        // Don't log the host, in case it's REALLY long
        bail!(
            "Invalid host of length {} in {:?} line {}",
            host.len(),
            source_info,
            line_num
        );
        bail!("Invalid host of length {}", host.len());
    }

    for (idx, c) in host.char_indices() {
        if idx == host.len() - 1 {
            // Last char
            if !c.is_ascii_alphanumeric() {
                bail!(
                    "Invalid host in {:?} line {}: Last char must be alphanumeric: {}",
                    source_info,
                    line_num,
                    host
                );
                bail!("Invalid host, last char must be alphanumeric: {}", host);
            }
        } else {
            // First and middle char(s)
            if !c.is_ascii_alphanumeric() && c != '-' && c != '_' && c != '.' {
                bail!(
                    "Invalid host in {:?} line {}, middle chars must be alphanumeric, '-', or '.': {}",
                    source_info, line_num, host
                    "Invalid host, middle chars must be alphanumeric, '-', or '.': {}",
                    host
                );
            }
        }


@@ 632,8 603,14 @@ mod tests {
            local_path: "testlocal".to_string(),
        };

        assert_eq!(false, handle_block_line("a", 4, &mut content, &file_info).is_ok());
        assert_eq!(true, handle_block_line("ab", 4, &mut content, &file_info).is_ok());
        assert_eq!(
            false,
            handle_block_line("a", 4, &mut content, &file_info).is_ok()
        );
        assert_eq!(
            true,
            handle_block_line("ab", 4, &mut content, &file_info).is_ok()
        );

        assert_eq!(
            false,

M src/hyper_smol.rs => src/hyper_smol.rs +18 -9
@@ 23,7 23,7 @@ use crate::timeout;
/// This is for use when the HTTP endpoint may need "bootstrap" resolving of its own.
pub fn client_originz(
    mut resolver: resolver::Resolver,
    http2_only: bool,
    http2: bool,
    get_ipv6: bool,
    udp_size: u16,
    connect_timeout: Duration,


@@ 75,9 75,10 @@ pub fn client_originz(

    Client::builder()
        .executor(SmolExecutor)
        .http2_only(http2_only)
        .http2_only(http2)
        .build::<_, Body>(SmolConnector {
            _resolver_task: resolver_task,
            http2,
            resolver_tx,
            connect_timeout,
        })


@@ 87,7 88,7 @@ pub fn client_originz(
/// - Using smol for the connection and async runtime
/// - Using no resolver for resolving any hostnames (hosts MUST be provided as IPs)
/// This is for use when the HTTP endpoint will be provided as an IP.
pub fn client_iponly(http2_only: bool, connect_timeout: Duration) -> Client<SmolConnector> {
pub fn client_iponly(http2: bool, connect_timeout: Duration) -> Client<SmolConnector> {
    let (resolver_tx, resolver_rx): (
        async_channel::Sender<ResolverQuery>,
        async_channel::Receiver<ResolverQuery>,


@@ 130,9 131,10 @@ pub fn client_iponly(http2_only: bool, connect_timeout: Duration) -> Client<Smol

    Client::builder()
        .executor(SmolExecutor)
        .http2_only(http2_only)
        .http2_only(http2)
        .build::<_, Body>(SmolConnector {
            _resolver_task: resolver_task,
            http2,
            resolver_tx,
            connect_timeout,
        })


@@ 142,7 144,7 @@ pub fn client_iponly(http2_only: bool, connect_timeout: Duration) -> Client<Smol
/// - Using smol for the connection and async runtime
/// - Using the system resolver for resolving any hostnames
/// This is only meant for use in internal tooling, not for client requests.
pub fn client_system(http2_only: bool, connect_timeout: Duration) -> Client<SmolConnector> {
pub fn client_system(http2: bool, connect_timeout: Duration) -> Client<SmolConnector> {
    let (resolver_tx, resolver_rx): (
        async_channel::Sender<ResolverQuery>,
        async_channel::Receiver<ResolverQuery>,


@@ 178,9 180,10 @@ pub fn client_system(http2_only: bool, connect_timeout: Duration) -> Client<Smol

    Client::builder()
        .executor(SmolExecutor)
        .http2_only(http2_only)
        .http2_only(http2)
        .build::<_, Body>(SmolConnector {
            _resolver_task: resolver_task,
            http2,
            resolver_tx,
            connect_timeout,
        })


@@ 214,6 217,8 @@ struct ResolverQuery {
pub struct SmolConnector {
    /// Handle to keep the resolver task from dying prematurely
    _resolver_task: Arc<Task<()>>,
    /// Whether to enable http2 features
    http2: bool,
    /// Channel for sending requests to the resolver task
    resolver_tx: async_channel::Sender<ResolverQuery>,
    /// Timeout for connection/handshake calls to complete


@@ 231,6 236,7 @@ impl hyper::service::Service<Uri> for SmolConnector {

    fn call(&mut self, uri: Uri) -> Self::Future {
        // Get copies for async move:
        let http2 = self.http2.clone();
        let resolver_tx_copy = self.resolver_tx.clone();
        let connect_timeout = self.connect_timeout.clone();



@@ 322,10 328,13 @@ impl hyper::service::Service<Uri> for SmolConnector {
                                format!("HTTPS TCP connect failed: {:?}", socket_addr)
                            })?;
                            // Min protocol: If things don't have at least TLS1.2 by now, we should just name and shame.
                            let mut connector =
                                TlsConnector::new().min_protocol_version(Some(Protocol::Tlsv12));
                            // ALPN: Required for http2/DoH, otherwise we get 'http2 error: protocol error: frame with invalid size'
                            let connector = TlsConnector::new()
                                .min_protocol_version(Some(Protocol::Tlsv12))
                                .request_alpns(&["h2"]);
                            // Meanwhile, required OFF for most other hosts, otherwise we get 'connection closed before message completed'
                            if http2 {
                                connector = connector.request_alpns(&["h2"]);
                            }
                            let stream =
                                timeout::timeout(connector.connect(host, stream), &connect_timeout)
                                    .await

M src/runner.rs => src/runner.rs +31 -13
@@ 12,7 12,7 @@ use async_net::{TcpListener, TcpStream, UdpSocket};
use bytes::BytesMut;
use hyper::Client;
use smol::Task;
use tracing::{self, debug, info, trace, warn};
use tracing::{self, debug, error, info, trace, warn};

use crate::filter::{filter, reader};
use crate::{cache, client, config, hyper_smol, listen_tcp, listen_udp, lookup};


@@ 355,7 355,7 @@ impl Runner {
    }
}

/// Sets up a thread for periodically reloading filters.
/// Runs a refresh of all configured filters, redownloading and updating any that may be out of date.
/// Avoids using an async task because they dislike interacting with mutexes.
async fn refresh_filters(
    filters_dir: &PathBuf,


@@ 369,27 369,45 @@ async fn refresh_filters(
        // Then re-read the result from disk.
        let mut override_filters = Vec::new();
        for entry in &conf.overrides {
            if let Ok((file_info, downloaded)) =
                filter::update_if_url(fetch_client, filters_dir, entry, DOWNLOAD_TIMEOUT_MS).await
            match filter::update_if_url(fetch_client, filters_dir, entry, DOWNLOAD_TIMEOUT_MS).await
            {
                if downloaded || force_update {
                    if let Ok(filter) = reader::read_override(file_info) {
                        override_filters.push(filter);
                Ok((file_info, downloaded)) => {
                    if downloaded || force_update {
                        match reader::read_override(&file_info) {
                            Ok(filter) => override_filters.push(filter),
                            Err(e) => warn!(
                                "{:?}",
                                e.context(format!("Failed to read override {:?}", file_info))
                            ),
                        }
                    }
                }
                Err(e) => warn!(
                    "{:?}",
                    e.context(format!("Failed to update override {}", entry))
                ),
            }
        }

        let mut block_filters = Vec::new();
        for entry in &conf.blocks {
            if let Ok((file_info, downloaded)) =
                filter::update_if_url(fetch_client, filters_dir, entry, DOWNLOAD_TIMEOUT_MS).await
            match filter::update_if_url(fetch_client, filters_dir, entry, DOWNLOAD_TIMEOUT_MS).await
            {
                if downloaded || force_update {
                    if let Ok(filter) = reader::read_block(file_info) {
                        block_filters.push(filter);
                Ok((file_info, downloaded)) => {
                    if downloaded || force_update {
                        match reader::read_block(&file_info) {
                            Ok(filter) => block_filters.push(filter),
                            Err(e) => warn!(
                                "{:?}",
                                e.context(format!("Failed to read block filter {:?}", file_info))
                            ),
                        }
                    }
                }
                Err(e) => warn!(
                    "{:?}",
                    e.context(format!("Failed to update block filter {}", entry))
                ),
            }
        }



@@ 397,7 415,7 @@ async fn refresh_filters(
            filter_locked.update_override_entries(override_filters);
            filter_locked.update_block_entries(block_filters);
        } else {
            warn!("Failed to lock filter for entry update");
            error!("Failed to lock filter object for entry update");
        }
    }
}