~nickbp/kapiti

d060ca01b420c78cbc077698b44514565118416a — Nick Parker a month ago 8b9a786
Implement clean thread teardown on sigkill/sigint

Avoids aborting things uncleanly if we're in the middle of a filter update, for example
5 files changed, 120 insertions(+), 41 deletions(-)

M Cargo.lock
M Cargo.toml
M src/filter/filter.rs
M src/main.rs
M src/runner.rs
M Cargo.lock => Cargo.lock +1 -0
@@ 975,6 975,7 @@ dependencies = [
 "scopeguard",
 "serde",
 "sha2",
 "signal-hook",
 "smol",
 "tempfile",
 "tokio",

M Cargo.toml => Cargo.toml +1 -0
@@ 46,6 46,7 @@ rustls-native-certs = "0.5"
scopeguard = "1.1"
serde = { version = "1.0", features = ["derive"] }
sha2 = "0.9"
signal-hook = "0.3.8"
smol = "1.2"
tokio = { version = "1.0.1", default-features = false } # for hyper-smol compatibility shim
toml = "0.5"

M src/filter/filter.rs => src/filter/filter.rs +1 -1
@@ 2,7 2,7 @@ use std::path::{Path, PathBuf};
use std::vec::Vec;

use anyhow::{Context, Result};
use hyper::{Client, Uri};
use hyper::Client;
use sha2::{Digest, Sha256};

use crate::filter::{downloader, path, reader};

M src/main.rs => src/main.rs +12 -2
@@ 1,12 1,14 @@
use std::env;
use std::ffi::CString;
use std::sync::Arc;
use std::thread;

use anyhow::{bail, Context, Result};
use async_lock::Barrier;
use git_testament::{git_testament, render_testament};
use libc;
use nix::unistd;
use signal_hook::{consts, iterator};
use tracing::{self, debug, info};

use kapiti::runner::Runner;


@@ 73,9 75,17 @@ fn main() -> Result<()> {
    }

    // Run the service indefinitely, it will exit when the stop barrier returns.
    // For now the stop barrier does nothing, but we could attach it to a unix signal listener or similar to allow clean shutdown.
    let barrier = Arc::new(Barrier::new(2));
    smol::block_on(runner.run(barrier.clone()))
    let barrier_copy = barrier.clone();
    let mut signals = iterator::Signals::new(&[consts::SIGINT, consts::SIGTERM])?;
    thread::spawn(move || {
        // Clean shutdown on Ctrl+C or TERM
        for sig in signals.forever() {
            info!("Received signal: {:?}", sig);
            smol::block_on(barrier_copy.wait());
        }
    });
    smol::block_on(runner.run(barrier))
}

/// Downgrades the user running the process to the provided username

M src/runner.rs => src/runner.rs +105 -38
@@ 151,6 151,31 @@ impl Runner {
            });
        }

        // Spawn task to trigger timer-based filter refreshes
        let (filter_refresh_tx, filter_refresh_rx): (
            async_channel::Sender<()>,
            async_channel::Receiver<()>,
        ) = async_channel::bounded(1);

        // Kick an initial refresh notification
        filter_refresh_tx.send(()).await?;
        let filter_refresh_timer_task = if self.config.filter_refresh_seconds == 0 {
            info!("Skipping filter refresh loop: filter_refresh_seconds=0");
            None
        } else {
            let filter_refresh = Duration::from_secs(self.config.filter_refresh_seconds);
            Some(smol::spawn(async move {
                loop {
                    async_io::Timer::after(filter_refresh).await;
                    debug!("Notifying filter refresh");
                    if let Err(e) = filter_refresh_tx.send(()).await {
                        warn!("Got error when sending refresh to queue: {}", e);
                        // Continue loop in case this is some kind of temporary issue...
                    }
                }
            }))
        };

        let filters_dir = self.storage_dir.join("filters");
        if !filters_dir.exists() {
            fs::create_dir(&filters_dir).with_context(|| {


@@ 175,12 200,13 @@ impl Runner {
        let mut thread_handles = Vec::new();

        // Set up a thread for periodically reloading filters.
        // Avoids using an async task because they dislike interacting with mutexes.
        // Avoids only using a async task because they dislike interacting with mutexes.
        {
            let resolver = client::upstream::parse_upstreams(cache_tx.clone(), &self.config.upstreams)?;
            let filter_copy = filter.clone();
            // For now we assume that the list of filters is constant.
            // Once we start supporting live updates of the filter list, this will need to be restructured.
            let filters_copy = self.config.filters.clone();
            let filter_refresh = Duration::from_secs(self.config.filter_refresh_seconds);
            thread_handles.push(
                thread::Builder::new()
                    .name("filter-reload".to_string())


@@ 189,14 215,25 @@ impl Runner {
                            let fetch_client = hyper_smol::client_kapiti(resolver, false, false, 4096);
                            let span = tracing::info_span!("update-filters");
                            let _enter = span.enter();
                            // Ensure that the filters are loaded in on the first pass, even if they
                            // weren't redownloaded due to an up-to-date local cached copy.
                            let mut force_load = true;
                            loop {
                                refresh_filters(&filters_dir, &filter_copy, &filters_copy, &fetch_client).await;
                                if filter_refresh.as_secs() == 0 {
                                    info!("Exiting filter refresh: filter_refresh_seconds=0");
                                if let Ok(_) = filter_refresh_rx.recv().await {
                                    debug!("Refreshing filters with force_load={}", force_load);
                                    refresh_filters(
                                        &filters_dir,
                                        &filter_copy,
                                        &filters_copy,
                                        &fetch_client,
                                        force_load
                                    ).await;
                                    force_load = false;
                                    debug!("Filters refreshed");
                                } else {
                                    info!("Exiting filter refresh thread: timer queue has closed.");
                                    break;
                                }
                                info!("Next filter refresh in {:?}", filter_refresh);
                                thread::sleep(filter_refresh);
                            }
                        });
                    })?


@@ 216,13 253,15 @@ impl Runner {
                smol::block_on(async move {
                    let mut tcp_buf = BytesMut::with_capacity(MAX_TCP_BYTES as usize);
                    loop {
                        handle_next_request(
                        if !handle_next_request(
                            &server_query_rx_copy,
                            &mut lookup,
                            &mut tcp_buf,
                            &udp_sock_copy,
                            &response_timeout
                        ).await;
                        ).await {
                            break;
                        }
                    }
                });
            };


@@ 240,19 279,26 @@ impl Runner {
        stop.wait().await;
        info!("Shutting down: stop signal received");

        // First, drop the task handles for the TCP/UDP listeners so that they stop.
        // First, drop the task handles for the TCP/UDP listeners and filter refresh timer so that they stop.
        drop(tcp_listener_task);
        drop(udp_listener_task);
        if let Some(task) = filter_refresh_timer_task {
            drop(task);
        }

        // Now that the listeners have stopped, the processing queue should eventually empty.
        // Once empty, the queue should return None to the threads since its inputs (the listeners) have all been dropped.
        // At that point the threads should exit on their own.
        for thread in thread_handles {
            let thread_str = format!("{:?}", &thread);
            debug!("Waiting for thread to exit: {}", thread_str);
            let thread_name = if let Some(name) = &thread.thread().name() {
                name.to_string()
            } else {
                "???".to_string()
            };
            info!("Waiting for thread to exit: {}", thread_name);
            // Would use with_context but the error type is weird
            match thread.join().err() {
                Some(e) => bail!("Failed to wait for thread to exit: {} {:?}", thread_str, e),
                Some(e) => bail!("Failed to wait for thread to exit: {} {:?}", thread_name, e),
                None => {}
            }
        }


@@ 271,19 317,32 @@ async fn refresh_filters(
    filters_dir: &PathBuf,
    filter: &Arc<Mutex<filter::Filter>>,
    filter_configs: &HashMap<String, config::ConfigFilter>,
    fetch_client: &Client<hyper_smol::SmolConnector>
    fetch_client: &Client<hyper_smol::SmolConnector>,
    force_update: bool
) {
    for (_name, conf) in filter_configs {
        let mut filters = Vec::new();
        // For each override/block, check the URL (or do nothing for a local path)
        // Then re-read the result from disk.
        for entry in &conf.overrides {
            if let Some(filter) = refresh_filter(fetch_client, filters_dir, entry, reader::FilterType::OVERRIDE).await {
            if let Some(filter) = refresh_filter(
                fetch_client,
                filters_dir,
                entry,
                reader::FilterType::OVERRIDE,
                force_update
            ).await {
                filters.push(filter);
            }
        }
        for entry in &conf.blocks {
            if let Some(filter) = refresh_filter(fetch_client, filters_dir, entry, reader::FilterType::BLOCK).await {
            if let Some(filter) = refresh_filter(
                fetch_client,
                filters_dir,
                entry,
                reader::FilterType::BLOCK,
                force_update
            ).await {
                filters.push(filter);
            }
        }


@@ 297,32 356,39 @@ async fn refresh_filters(

async fn refresh_filter(
    fetch_client: &Client<hyper_smol::SmolConnector>,
    filters_dir: &PathBuf,
    fetch_dir: &PathBuf,
    filter_path_or_url: &String,
    filter_type: reader::FilterType,
    force_update: bool,
) -> Option<reader::FilterEntries> {
    if let Ok(filter_uri) = Uri::try_from(filter_path_or_url) {
        // Parsed as a URL, try to download
        // Filesystem paths can get parsed as URLs with no scheme
        // COMPILER CRASH!:
        if filter_uri.scheme() != None && let Ok((local_path, _downloaded)) = filter::update_url(
            fetch_client,
            filters_dir,
            filter_path_or_url,
            10000,
        ).await {
            if let Ok(filter) = reader::read(
                filter_type,
                reader::FileInfo {
                    source_path: filter_path_or_url.clone(),
                    local_path
        if filter_uri.scheme() != None {
            // Looks like a URL, check for updated version to download
            if let Ok((local_path, updated)) = filter::update_url(
                fetch_client,
                fetch_dir,
                filter_path_or_url,
                10000,
            ).await {
                if updated || force_update {
                    // The file had an update to download, or an update/read is being forced
                    if let Ok(filter) = reader::read(
                        filter_type,
                        reader::FileInfo {
                            source_path: filter_path_or_url.clone(),
                            local_path
                        }
                    ) {
                        return Some(filter);
                    }
                }
            ) {
                return Some(filter);
            }
            return None;
        }
        return None
    } else if let Ok(filter) = reader::read(
    }

    if let Ok(filter) = reader::read(
        filter_type,
        reader::FileInfo {
            source_path: filter_path_or_url.clone(),


@@ 340,20 406,20 @@ async fn handle_next_request(
    tcp_buf: &mut BytesMut,
    udp_sock: &UdpSocket,
    response_timeout: &Duration
) {
) -> bool {
    let request: RequestMsg;
    match server_query_rx.lock() {
        Err(e) => {
            warn!("Failed to lock receive queue, trying again: {:?}", e);
            return;
            return true;
        }
        Ok(server_query_rx_lock) => {
            // Grab a request from the queue, then release the lock
            if let Ok(got_request) = server_query_rx_lock.recv().await {
                request = got_request;
            } else {
                info!("Exiting thread: request queue has closed.");
                return;
                info!("Exiting handler thread: request queue has closed.");
                return false;
            }
        }
    }


@@ 378,4 444,5 @@ async fn handle_next_request(
            tcp_buf.resize(MAX_TCP_BYTES as usize, 0);
        }
    }
    false
}