~nickbp/kapiti

3d30283ef6f3473ce797785b9e3daa7e80b761eb — Nick Parker 7 months ago 1d8dac9
Tidy up runner structure a bit
2 files changed, 131 insertions(+), 106 deletions(-)

M src/config.rs
M src/runner.rs
M src/config.rs => src/config.rs +2 -1
@@ 76,6 76,7 @@ pub struct Config {
    #[serde(default = "default_listen_https")]
    pub listen_https: String,

    /// How long to wait between checks for changes to local and remote filters.
    #[serde(default = "default_filter_refresh_seconds")]
    pub filter_refresh_seconds: u64,



@@ 147,7 148,7 @@ fn default_listen_https() -> String {
}

fn default_filter_refresh_seconds() -> u64 {
    3600
    10800 // 3 hours
}

/// A config value that can be provided either as a string or a list of strings in the TOML file.

M src/runner.rs => src/runner.rs +129 -105
@@ 1,3 1,4 @@
use std::collections::HashMap;
use std::fs;
use std::net::{SocketAddr, ToSocketAddrs};
use std::path::PathBuf;


@@ 9,6 10,7 @@ use anyhow::{bail, Context, Result};
use async_lock::Barrier;
use async_net::{TcpListener, TcpStream, UdpSocket};
use bytes::BytesMut;
use hyper::Client;
use smol::Task;
use tracing::{self, debug, info, warn};



@@ 148,7 150,6 @@ impl Runner {
            });
        }

        let resolver = client::upstream::parse_upstreams(cache_tx.clone(), &self.config.upstreams)?;
        let filters_dir = self.storage_dir.join("filters");
        if !filters_dir.exists() {
            fs::create_dir(&filters_dir).with_context(|| {


@@ 174,79 175,37 @@ impl Runner {

        // Set up a thread for periodically reloading filters.
        // Avoids using an async task because they dislike interacting with mutexes.
        let config_filters_copy = self.config.filters.clone();
        let config_filter_refresh = Duration::from_secs(self.config.filter_refresh_seconds);
        let filter_copy = filter.clone();
        let thread_fn = move || {
            smol::block_on(async move {
                let fetch_client = hyper_smol::client_kapiti(resolver, false, false, 4096);
                let span = tracing::info_span!("update-filters");
                let _enter = span.enter();
                // TODO(#9) for local paths, run inotify-based updates in a separate thread from this one
                loop {
                    for (_name, conf) in &config_filters_copy {
                        let mut filters = Vec::new();
                        for entry in &conf.overrides {
                            if let Ok(download_path_str) = filter::update_url(
                                &fetch_client,
                                &filters_dir,
                                &entry,
                                10000,
                            ).await {
                                if let Ok(filter) = reader::read(
                                    reader::FilterType::OVERRIDE,
                                    reader::FileInfo {
                                        source_path: entry.clone(),
                                        local_path: download_path_str
                                    }
                                ) {
                                    filters.push(filter);
                                }
                            }
                        }
                        for entry in &conf.blocks {
                            if let Ok(download_path_str) = filter::update_url(
                                &fetch_client,
                                &filters_dir,
                                &entry,
                                10000,
                            ).await {
                                if let Ok(filter) = reader::read(
                                    reader::FilterType::BLOCK,
                                    reader::FileInfo {
                                        source_path: entry.clone(),
                                        local_path: download_path_str,
                                    }
                                ) {
                                    filters.push(filter);
        {
            let resolver = client::upstream::parse_upstreams(cache_tx.clone(), &self.config.upstreams)?;
            let filter_copy = filter.clone();
            let filters_copy = self.config.filters.clone();
            let filter_refresh = Duration::from_secs(self.config.filter_refresh_seconds);
            thread_handles.push(
                thread::Builder::new()
                    .name("filter-reload".to_string())
                    .spawn(move || {
                        smol::block_on(async move {
                            let fetch_client = hyper_smol::client_kapiti(resolver, false, false, 4096);
                            let span = tracing::info_span!("update-filters");
                            let _enter = span.enter();
                            loop {
                                refresh_remote_filters(&filters_dir, &filter_copy, &filters_copy, &fetch_client).await;
                                if filter_refresh.as_secs() == 0 {
                                    info!("Exiting filter refresh: filter_refresh_seconds=0");
                                    break;
                                }
                                info!("Next filter refresh in {:?}", filter_refresh);
                                thread::sleep(filter_refresh);
                            }
                        }
                        if let Ok(mut filter_locked) = filter_copy.lock() {
                            filter_locked.update_entries(filters);
                        } else {
                            warn!("Failed to lock filter for entry update");
                        }
                    }
                    if config_filter_refresh.as_secs() == 0 {
                        info!("Exiting filter refresh: filter_refresh_seconds=0");
                        break;
                    }
                    info!("Next filter refresh in {:?}", config_filter_refresh);
                    thread::sleep(config_filter_refresh);
                }
            });
        };
        thread_handles.push(
            thread::Builder::new()
                .name("filter-reload".to_string())
                .spawn(thread_fn)?,
        );
                        });
                    })?
            );
        }

        // Start independent threads to handle received requests, query upstreams, and send back responses
        let response_timeout = Duration::from_millis(1000);
        for i in 0..10 {
            let mut server = lookup::Lookup::new(
            let mut lookup = lookup::Lookup::new(
                client::upstream::parse_upstreams(cache_tx.clone(), &self.config.upstreams)?,
                filter.clone(),
            );


@@ 256,43 215,13 @@ impl Runner {
                smol::block_on(async move {
                    let mut tcp_buf = BytesMut::with_capacity(MAX_TCP_BYTES as usize);
                    loop {
                        let request: RequestMsg;
                        match server_query_rx_copy.lock() {
                            Err(e) => {
                                warn!("Failed to lock receive queue, trying again: {:?}", e);
                                continue;
                            }
                            Ok(server_query_rx_lock) => {
                                // Grab a request from the queue, then release the lock
                                if let Ok(got_request) = server_query_rx_lock.recv().await {
                                    request = got_request;
                                } else {
                                    info!("Exiting thread: request queue has closed.");
                                    return;
                                }
                            }
                        }

                        let span = tracing::info_span!("handle-query");
                        let _enter = span.enter();
                        match request.data {
                            RequestData::Udp(buf) => {
                                listen_udp::handle_udp_request(&mut server, request.src, buf, &udp_sock_copy)
                                    .await
                            }
                            RequestData::Tcp(tcp_stream) => {
                                listen_tcp::handle_tcp_request(
                                    &mut server,
                                    &response_timeout,
                                    request.src,
                                    tcp_stream,
                                    &mut tcp_buf,
                                )
                                .await;
                                // Reset buffer size afterwards (capacity should stay the same)
                                tcp_buf.resize(MAX_TCP_BYTES as usize, 0);
                            }
                        }
                        handle_next_request(
                            &server_query_rx_copy,
                            &mut lookup,
                            &mut tcp_buf,
                            &udp_sock_copy,
                            &response_timeout
                        ).await;
                    }
                });
            };


@@ 334,3 263,98 @@ impl Runner {
        Ok(())
    }
}

/// Sets up a thread for periodically reloading filters.
/// Avoids using an async task because they dislike interacting with mutexes.
async fn refresh_remote_filters(filters_dir: &PathBuf, filter: &Arc<Mutex<filter::Filter>>, filter_configs: &HashMap<String, config::ConfigFilter>, fetch_client: &Client<hyper_smol::SmolConnector>) {
    for (_name, conf) in filter_configs {
        let mut filters = Vec::new();
        for entry in &conf.overrides {
            if let Ok(download_path_str) = filter::update_url(
                fetch_client,
                filters_dir,
                entry,
                10000,
            ).await {
                if let Ok(filter) = reader::read(
                    reader::FilterType::OVERRIDE,
                    reader::FileInfo {
                        source_path: entry.clone(),
                        local_path: download_path_str
                    }
                ) {
                    filters.push(filter);
                }
            }
        }
        for entry in &conf.blocks {
            if let Ok(download_path_str) = filter::update_url(
                fetch_client,
                filters_dir,
                entry,
                10000,
            ).await {
                if let Ok(filter) = reader::read(
                    reader::FilterType::BLOCK,
                    reader::FileInfo {
                        source_path: entry.clone(),
                        local_path: download_path_str,
                    }
                ) {
                    filters.push(filter);
                }
            }
        }
        if let Ok(mut filter_locked) = filter.lock() {
            filter_locked.update_entries(filters);
        } else {
            warn!("Failed to lock filter for entry update");
        }
    }
}

async fn handle_next_request(
    server_query_rx: &Mutex<async_channel::Receiver<RequestMsg>>,
    lookup: &mut lookup::Lookup,
    tcp_buf: &mut BytesMut,
    udp_sock: &UdpSocket,
    response_timeout: &Duration
) {
    let request: RequestMsg;
    match server_query_rx.lock() {
        Err(e) => {
            warn!("Failed to lock receive queue, trying again: {:?}", e);
            return;
        }
        Ok(server_query_rx_lock) => {
            // Grab a request from the queue, then release the lock
            if let Ok(got_request) = server_query_rx_lock.recv().await {
                request = got_request;
            } else {
                info!("Exiting thread: request queue has closed.");
                return;
            }
        }
    }

    let span = tracing::info_span!("handle-query");
    let _enter = span.enter();
    match request.data {
        RequestData::Udp(buf) => {
            listen_udp::handle_udp_request(lookup, request.src, buf, udp_sock)
                .await
        }
        RequestData::Tcp(tcp_stream) => {
            listen_tcp::handle_tcp_request(
                lookup,
                response_timeout,
                request.src,
                tcp_stream,
                tcp_buf,
            )
                .await;
            // Reset buffer size afterwards (capacity should stay the same)
            tcp_buf.resize(MAX_TCP_BYTES as usize, 0);
        }
    }
}