~nickbp/kapiti

0dfbb8de73f6b881a91eac29e61e6b8d5e5a3462 — Nick Parker 7 months ago f9aaeaf
Implement polling-based background filter updates (#9)

Local files are currently only updated by the main polling loop, could instead add a separate inotify-based update thread for them?
3 files changed, 110 insertions(+), 95 deletions(-)

M src/config.rs
M src/filter/filter.rs
M src/runner.rs
M src/config.rs => src/config.rs +9 -1
@@ 12,7 12,7 @@ use serde::{Deserialize, Deserializer};
use tracing::{trace, warn};

/// The struct representation of a Kapiti filter section
#[derive(Debug, Deserialize)]
#[derive(Clone, Debug, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct ConfigFilter {
    /// Zero or more file paths or URLs for block files, where URLs will be automatically updated periodically.


@@ 76,6 76,9 @@ pub struct Config {
    #[serde(default = "default_listen_https")]
    pub listen_https: String,

    #[serde(default = "default_filter_refresh_seconds")]
    pub filter_refresh_seconds: u64,

    /// Filters to apply against request.
    /// Each filter block may optionally specify applicable client IPs for that block.
    #[serde(default)]


@@ 116,6 119,7 @@ impl Config {
            listen_http: listen_random.clone(),
            listen_https: listen_random,
            upstreams: vec![upstream],
            filter_refresh_seconds: 3600,
            filters: HashMap::new(),
            redis: "".to_string(),
        }


@@ 142,6 146,10 @@ fn default_listen_https() -> String {
    "0.0.0.0:443".to_string()
}

fn default_filter_refresh_seconds() -> u64 {
    3600
}

/// A config value that can be provided either as a string or a list of strings in the TOML file.
/// We convert both cases to a `Vec<String>` automatically.
fn string_or_seq_string<'de, D>(deserializer: D) -> Result<Vec<String>, D::Error>

M src/filter/filter.rs => src/filter/filter.rs +12 -74
@@ 1,16 1,14 @@
use std::convert::TryFrom;
use std::fs;
use std::path::{Path, PathBuf};
use std::vec::Vec;

use anyhow::{bail, Context, Result};
use hyper::{Body, Client, Uri};
use anyhow::{Context, Result};
use hyper::{Client, Uri};
use sha2::{Digest, Sha256};
use tracing::debug;
use tracing_attributes::instrument;

use crate::filter::{downloader, path, reader};
use crate::{http, hyper_smol, resolver};
use crate::{http, hyper_smol};

/// An iterator that goes over the parent domains of a provided child domain.
/// For example, www.domain.com => [www.domain.com, domain.com, com]


@@ 57,82 55,23 @@ impl<'a> Iterator for DomainParentIter<'a> {
pub struct Filter {
    overrides: Vec<reader::FilterEntries>,
    blocks: Vec<reader::FilterEntries>,
    filters_dir: PathBuf,
    fetch_client: Client<hyper_smol::SmolConnector, Body>,
}

impl Filter {
    pub fn new(filters_dir: PathBuf, resolver: resolver::Resolver) -> Result<Filter> {
        let fetch_client = hyper_smol::client_kapiti(resolver, false, false, 4096);

        if !filters_dir.exists() {
            fs::create_dir(&filters_dir).with_context(|| {
                format!(
                    "Failed to create filter download directory: {:?}",
                    filters_dir
                )
            })?;
        } else if filters_dir.is_file() {
            bail!(
                "Filter download directory configured storage path is a regular file: {:?}",
                filters_dir
            );
        }

        Ok(Filter {
    pub fn new() -> Filter {
        Filter {
            overrides: vec![],
            blocks: vec![],
            filters_dir,
            fetch_client,
        })
    }

    pub fn update_override(self: &mut Filter, override_path: &String) -> Result<()> {
        let file_entries = reader::read(
            reader::FilterType::OVERRIDE,
            reader::FileInfo {
                source_path: override_path.clone(),
                local_path: override_path.clone()
            }
        )?;

        // Before adding new entry, check for existing entry to be replaced/updated.
        upsert_entries(&mut self.overrides, file_entries);
        Ok(())
        }
    }

    #[instrument(skip(self))] // skip non-Debug stuff
    pub async fn update_block(
        self: &mut Filter,
        hosts_entry: &String,
        timeout_ms: u64,
    ) -> Result<()> {
        let download_path_str = update_url(
            &self.fetch_client,
            &self.filters_dir,
            &hosts_entry,
            timeout_ms,
        )
        .await?;
        let file_entries = reader::read(
            reader::FilterType::BLOCK,
            reader::FileInfo {
                source_path: hosts_entry.clone(),
                local_path: download_path_str,
    pub fn update_entries(self: &mut Filter, entrieses: Vec<reader::FilterEntries>) {
        for entries in entrieses {
            match entries.filter_type {
                reader::FilterType::BLOCK => upsert_entries(&mut self.blocks, entries),
                reader::FilterType::OVERRIDE => upsert_entries(&mut self.overrides, entries),
            }
        )?;
        if !file_entries.is_empty() {
            // Note: In theory we could dedupe entries across different blocks to save memory.
            // However this causes problems if we want to granularly update individual files.
            // For example if file A had a hostname that we omit from file B, and then file A is updated
            // to no longer mention that hostname, we'd want to reintroduce it into file B.

            // So for now the marginal gain likely isn't worth the complexity, but in the future we could
            // rebuild + dedupe a single monolithic tree each time ANY file is updated.
            // BUT this makes it harder to source filter decisions since everything will be merged.
            upsert_entries(&mut self.blocks, file_entries);
        }
        Ok(())
    }

    pub fn set_hardcoded_block(self: &mut Filter, block_names: &[&str]) -> Result<()> {


@@ 171,7 110,7 @@ impl Filter {
    }
}

async fn update_url(
pub async fn update_url(
    fetch_client: &Client<hyper_smol::SmolConnector>,
    filters_dir: &PathBuf,
    hosts_path: &String,


@@ 207,7 146,6 @@ async fn update_url(
            .to_string())
    } else {
        // Couldn't parse as URL, assume it's a local file
        debug!("file: {}", hosts_path);
        Ok(hosts_path.to_string())
    }
}

M src/runner.rs => src/runner.rs +89 -20
@@ 12,8 12,8 @@ use bytes::BytesMut;
use smol::Task;
use tracing::{self, debug, info, warn};

use crate::filter::filter;
use crate::{cache, client, config, listen_tcp, listen_udp, lookup};
use crate::filter::{filter, reader};
use crate::{cache, client, config, hyper_smol, listen_tcp, listen_udp, lookup};

/// TCP size header is 16 bits, so max theoretical size is 64k
static MAX_TCP_BYTES: u16 = 65535;


@@ 148,34 148,103 @@ impl Runner {
            });
        }

        // TODO(#9)
        // - do initialization in background: don't block (re)start on a filter being slow to update
        // - schedule periodic background update checks for filter files (should only download if URL appears updated)
        // - refrain from killing the process if a download fails or something
        let resolver = client::upstream::parse_upstreams(cache_tx.clone(), &self.config.upstreams)?;
        let mut filter = filter::Filter::new(self.storage_dir.join("filters"), resolver)?;
        let filters_dir = self.storage_dir.join("filters");
        if !filters_dir.exists() {
            fs::create_dir(&filters_dir).with_context(|| {
                format!(
                    "Failed to create filter download directory: {:?}",
                    filters_dir
                )
            })?;
        } else if filters_dir.is_file() {
            bail!(
                "Filter download directory configured storage path is a regular file: {:?}",
                filters_dir
            );
        }

        let mut filter = filter::Filter::new();
        // Set up the hardcoded values first - for now they take priority over any manual configuration.
        // There isn't a good reason for a user to override a Kapiti test domain, for example.
        filter.set_hardcoded_block(HARDCODED_BLOCKED_HOSTS.into())?;
        // TODO(#30): Implement advanced filter blocks with applies_to support, and with allow support
        for (_name, conf) in &self.config.filters {
            // Allow these path lists to be unset/empty
            for entry in &conf.overrides {
                filter.update_override(entry)?;
            }
            {
        let filter = Arc::new(Mutex::new(filter));

        let mut thread_handles = Vec::new();

        // Set up a thread for periodically reloading filters.
        // Avoids using an async task because they dislike interacting with mutexes.
        let config_filters_copy = self.config.filters.clone();
        let config_filter_refresh = Duration::from_secs(self.config.filter_refresh_seconds);
        let filter_copy = filter.clone();
        let thread_fn = move || {
            smol::block_on(async move {
                let fetch_client = hyper_smol::client_kapiti(resolver, false, false, 4096);
                let span = tracing::info_span!("update-filters");
                let _enter = span.enter();
                for entry in &conf.blocks {
                    filter.update_block(entry, 10000).await?;
                // TODO(#9) for local paths, run inotify-based updates in a separate thread from this one
                loop {
                    for (_name, conf) in &config_filters_copy {
                        let mut filters = Vec::new();
                        for entry in &conf.overrides {
                            if let Ok(download_path_str) = filter::update_url(
                                &fetch_client,
                                &filters_dir,
                                &entry,
                                10000,
                            ).await {
                                if let Ok(filter) = reader::read(
                                    reader::FilterType::OVERRIDE,
                                    reader::FileInfo {
                                        source_path: entry.clone(),
                                        local_path: download_path_str
                                    }
                                ) {
                                    filters.push(filter);
                                }
                            }
                        }
                        for entry in &conf.blocks {
                            if let Ok(download_path_str) = filter::update_url(
                                &fetch_client,
                                &filters_dir,
                                &entry,
                                10000,
                            ).await {
                                if let Ok(filter) = reader::read(
                                    reader::FilterType::BLOCK,
                                    reader::FileInfo {
                                        source_path: entry.clone(),
                                        local_path: download_path_str,
                                    }
                                ) {
                                    filters.push(filter);
                                }
                            }
                        }
                        if let Ok(mut filter_locked) = filter_copy.lock() {
                            filter_locked.update_entries(filters);
                        } else {
                            warn!("Failed to lock filter for entry update");
                        }
                    }
                    if config_filter_refresh.as_secs() == 0 {
                        info!("Exiting filter refresh: filter_refresh_seconds=0");
                        break;
                    }
                    info!("Next filter refresh in {:?}", config_filter_refresh);
                    thread::sleep(config_filter_refresh);
                }
            };
        }
        let filter = Arc::new(Mutex::new(filter));
            });
        };
        thread_handles.push(
            thread::Builder::new()
                .name("filter-reload".to_string())
                .spawn(thread_fn)?,
        );

        // Start independent threads to handle received requests, query upstreams, and send back responses
        let response_timeout = Duration::from_millis(1000);
        let mut thread_handles = Vec::new();
        for i in 0..10 {
            let mut server = lookup::Lookup::new(
                client::upstream::parse_upstreams(cache_tx.clone(), &self.config.upstreams)?,