~nickbp/kapiti

199d6ffd6807329a93141c3bdd71fb04fa2f129f — Nick Parker 4 months ago 1f8b4da
Fix build and issues with benchmark harness
6 files changed, 67 insertions(+), 34 deletions(-)

M Cargo.toml
M benches/server.rs
M src/config.rs
M src/listen_tcp.rs
M src/listen_udp.rs
M src/runner.rs
M Cargo.toml => Cargo.toml +1 -1
@@ 58,7 58,7 @@ webpki = "0.21"
zstd = "0.7"

[dev-dependencies]
criterion = "0.3" # for benchmarks
criterion = { version = "0.3", features = ["html_reports"] } # for benchmarks
csv = "1.1" # for examples/update_specs
futures-delay-queue = "0.4" # for benchmarks
proptest = "1.0" # for property tests

M benches/server.rs => benches/server.rs +36 -18
@@ 1,4 1,4 @@
use std::net::{IpAddr, Ipv4Addr, SocketAddr, ToSocketAddrs, UdpSocket as SyncUdpSocket};
use std::net::{IpAddr, Ipv4Addr, SocketAddr, ToSocketAddrs};
use std::sync::Arc;
use std::thread::{self, JoinHandle};
use std::time::Duration;


@@ 24,6 24,7 @@ use kapiti::specs::message::{IntEnum, Question};
const LOCAL_EPHEMERAL_ENDPOINT: &str = "127.0.0.1:0";

const STUB_QUESTION_NAME: &str = "kapiti.io.";
const STUB_FILTER_INFO: &str = "benches/server";
const STUB_QUERY_IP: IpAddr = IpAddr::V4(Ipv4Addr::new(1, 2, 4, 8));

const UPSTREAM_LATENCY_FAST: Duration = Duration::from_nanos(1);


@@ 46,6 47,7 @@ fn write_stub_response() -> Result<BytesMut> {
            resource_class: IntEnum::Enum(ResourceClass::INTERNET),
        },
        &None,
        &STUB_FILTER_INFO.to_string(),
        Some(STUB_QUERY_IP),
        None,
        &mut buf,


@@ 131,7 133,9 @@ async fn run_udp_upstream(
        ).or(
            // Check if the socket has something to receive, or...
            async {
                let (size, dest) = udp_sock.recv_from(&mut request_buffer).await?;
                let (size, dest) = udp_sock.recv_from(&mut request_buffer)
                    .await
                    .with_context(|| "failed to receive request data from udp_sock")?;
                // Ensure that the response has a matching request ID (first two bytes)
                if size < 2 {
                    return Err(anyhow!("Expected request to have at least 2 bytes, but got {}", size));


@@ 146,7 150,9 @@ async fn run_udp_upstream(
        ).or(
            // ... or check if we should be stopping.
            async {
                stop_upstream_rx.recv().await?;
                stop_upstream_rx.recv()
                    .await
                    .with_context(|| "failed to receive signal from stop_upstream_rx, did main thread crash?")?;
                Ok(UpstreamEvent::Stop)
            }
        );


@@ 212,17 218,17 @@ fn start_udp_upstream(
}

struct RunInputs {
    client_sock: SyncUdpSocket,
    // Had previously been using a synchronous UdpSocket but was seeing weird errors around EINTR on recv calls.
    // So we instead just use async sockets everywhere.
    client_sock: UdpSocket,
    response_buffer: BytesMut,
}

fn setup_udp_requests(count: u64) -> Result<Vec<RunInputs>> {
async fn setup_udp_requests(count: u64) -> Result<Vec<RunInputs>> {
    let client_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0);
    let mut inputs_vec = Vec::new();
    for _i in 0..count {
        let client_sock = SyncUdpSocket::bind(client_addr)?;
        client_sock.set_read_timeout(Some(Duration::from_millis(5000)))?;
        client_sock.set_write_timeout(Some(Duration::from_millis(5000)))?;
        let client_sock = UdpSocket::bind(client_addr).await?;
        let mut response_buffer = BytesMut::with_capacity(4096);
        // Ensure that the buffer has a SIZE suitable for socket.recv_from().
        // If we just leave it with the CAPACITY then it drops data.


@@ 237,25 243,37 @@ fn setup_udp_requests(count: u64) -> Result<Vec<RunInputs>> {

/// Sets up and runs `count` requests, waiting for a response after each request.
/// Reinitializes every time so that `move` will work.
fn run_udp_requests(inputs_vec: Vec<RunInputs>, kapiti_udp_endpoint: SocketAddr) -> Result<()> {
async fn run_udp_requests(mut inputs_vec: Vec<RunInputs>, kapiti_udp_endpoint: SocketAddr) -> Result<()> {
    // Send requests for each entry in the batch
    for inputs in &inputs_vec {
    for (idx, inputs) in inputs_vec.iter().enumerate() {
        let sendsize = inputs
            .client_sock
            .send_to(&STUB_REQUEST[..], kapiti_udp_endpoint)
            .with_context(|| "send_to failed")?;
            .await
            .with_context(|| format!(
                "{} Bench client send_to from {:?} to {} failed",
                idx, inputs.client_sock.local_addr(), kapiti_udp_endpoint
            ))?;
        debug!(
            "Bench client sent {} bytes to {:?}",
            sendsize, kapiti_udp_endpoint
            "{} Bench client sent {} bytes from {:?} to {}",
            idx, sendsize, inputs.client_sock.local_addr(), kapiti_udp_endpoint
        );
    }
    // Wait for responses to come back
    for mut inputs in inputs_vec {
    for (idx, inputs) in inputs_vec.iter_mut().enumerate() {
        debug!("{} recv...", idx);
        let (recvsize, recvfrom) = inputs
            .client_sock
            .recv_from(&mut inputs.response_buffer)
            .with_context(|| "recv_from failed: no response from kapiti")?;
        debug!("Bench client got {} bytes from {:?}", recvsize, recvfrom);
            .await
            .with_context(|| format!(
                "{} Bench client recv_from failed from {} to {:?}",
                idx, kapiti_udp_endpoint, inputs.client_sock.local_addr()
            ))?;
        debug!(
            "{} Bench client got {} bytes from {:?} to {:?}",
            idx, recvsize, recvfrom, inputs.client_sock.local_addr()
        );
        if inputs.response_buffer[0] != STUB_REQUEST[0]
            || inputs.response_buffer[1] != STUB_REQUEST[1]
        {


@@ 313,9 331,9 @@ fn run_udp_udp_test(
    group.sample_size(samples);
    group.bench_function(name, |b| {
        b.iter_batched(
            move || setup_udp_requests(run_count).expect("setup failed"),
            move || smol::block_on(setup_udp_requests(run_count)).expect("setup failed"),
            move |inputs_vec| {
                run_udp_requests(inputs_vec, kapiti_udp_endpoint).expect("client run failed")
                smol::block_on(run_udp_requests(inputs_vec, kapiti_udp_endpoint)).expect("client run failed")
            },
            BatchSize::LargeInput,
        )

M src/config.rs => src/config.rs +11 -1
@@ 61,6 61,11 @@ pub struct Config {
    #[serde(default = "default_user")]
    pub user: String,

    /// Number of worker threads for processing queries. Higher values allow more queries
    /// to be handled simultaneously at the cost of more resource consumption.
    #[serde(default = "default_workers")]
    pub workers: u64,

    /// IP+Port endpoint for the DNS server.
    /// Defaults to `0.0.0.0:53`.
    #[serde(default = "default_listen_dns")]


@@ 134,11 139,12 @@ impl Config {
            storage: storage.to_string(),
            // Disable user downgrade to avoid system-specific issues (what if 'nobody' doesn't exist in the test environment?)
            user: "".to_string(),
            workers: default_workers(),
            listen_dns: listen_random.clone(),
            listen_http: listen_random.clone(),
            listen_https: listen_random,
            upstreams: vec![upstream],
            filter_refresh_seconds: 3600,
            filter_refresh_seconds: default_filter_refresh_seconds(),
            filters: HashMap::new(),
            overrides: Vec::new(),
            blocks: Vec::new(),


@@ 156,6 162,10 @@ fn default_user() -> String {
    "nobody".to_string()
}

fn default_workers() -> u64 {
    10
}

fn default_listen_dns() -> String {
    "0.0.0.0:53".to_string()
}

M src/listen_tcp.rs => src/listen_tcp.rs +2 -1
@@ 19,7 19,7 @@ pub async fn listen_tcp(
) -> Result<()> {
    loop {
        let (tcp_stream, request_source) = tcp_listener.accept().await?;
        trace!("Raw TCP stream from {:?}", request_source);
        trace!("Queueing raw TCP stream from {:?}", request_source);
        let msg = runner::RequestMsg {
            src: request_source,
            data: runner::RequestData::Tcp(tcp_stream),


@@ 90,6 90,7 @@ pub async fn handle_tcp_request(
    }

    // Send the response back to the client, prefaced by the u16 payload size
    trace!("Raw response to {:?} ({}+2b): {:02X?}", request_source, tcp_buf.len(), &tcp_buf[..]);
    BigEndian::write_u16(&mut message_size_bytes, tcp_buf.len() as u16);
    if let Err(ioerr) = tcp_stream
        .write_all(&message_size_bytes)

M src/listen_udp.rs => src/listen_udp.rs +4 -4
@@ 3,7 3,7 @@ use std::net::SocketAddr;
use anyhow::Result;
use async_net::UdpSocket;
use bytes::BytesMut;
use tracing::{self, debug, warn};
use tracing::{self, trace, warn};

use crate::{lookup, runner};



@@ 23,8 23,8 @@ pub async fn listen_udp(
        // Got a request from somewhere, shorten to actual size received (doesnt affect malloc)
        packet_buffer.truncate(recvsize);

        debug!(
            "Raw UDP request from {:?} ({}b): {:02X?}",
        trace!(
            "Queueing raw UDP request from {:?} ({}b): {:02X?}",
            request_source,
            packet_buffer.len(),
            &packet_buffer[..]


@@ 48,7 48,7 @@ pub async fn handle_udp_request(
    match lookup.handle_query(&mut buf).await {
        Ok(()) => {
            // Send the response back to the original UDP requestor.
            debug!("Raw response to {:?} ({}b): {:02X?}", request_source, buf.len(), &buf[..]);
            trace!("Raw response to {:?} ({}b): {:02X?}", request_source, buf.len(), &buf[..]);
            if let Err(e) = udp_sock.send_to(&mut buf, &request_source).await {
                warn!(
                    "Failed to send UDP DNS response to client={:?}: {:?}",

M src/runner.rs => src/runner.rs +13 -9
@@ 220,11 220,11 @@ impl Runner {
            }
            thread_handles.push(
                thread::Builder::new()
                    .name("filter-reload".to_string())
                    .name("filter-worker".to_string())
                    .spawn(move || {
                        smol::block_on(async move {
                            let fetch_client = hyper_smol::client_kapiti(resolver, false, false, 4096);
                            let span = tracing::info_span!("update-filters");
                            let span = tracing::info_span!("filter-worker");
                            let _enter = span.enter();
                            // Ensure that the filters are loaded in on the first pass, even if they
                            // weren't redownloaded due to an up-to-date local cached copy.


@@ 253,16 253,20 @@ impl Runner {

        // Start independent threads to handle received requests, query upstreams, and send back responses
        let response_timeout = Duration::from_millis(1000);
        for i in 0..10 {
        for i in 0..self.config.workers {
            let mut lookup = lookup::Lookup::new(
                client::upstream::parse_upstreams(cache_tx.clone(), &self.config.upstreams)?,
                filter.clone(),
            );
            let server_query_rx_copy = server_query_rx.clone();
            let udp_sock_copy = self.udp_sock.clone();
            let idx = i;
            let thread_fn = move || {
                smol::block_on(async move {
                    let mut tcp_buf = BytesMut::with_capacity(MAX_TCP_BYTES as usize);
                    // Shows up as 'query-worker{idx=5}' in logs
                    let span = tracing::info_span!("query-worker", idx);
                    let _enter = span.enter();
                    loop {
                        if !handle_next_request(
                            &server_query_rx_copy,


@@ 271,6 275,7 @@ impl Runner {
                            &udp_sock_copy,
                            &response_timeout
                        ).await {
                            warn!("Worker thread exiting, this is expected only if we're shutting down");
                            break;
                        }
                    }


@@ 278,7 283,7 @@ impl Runner {
            };
            thread_handles.push(
                thread::Builder::new()
                    .name(format!("query-exec-{}", i))
                    .name(format!("query-worker-{}", i))
                    .spawn(thread_fn)?,
            );
        }


@@ 421,8 426,9 @@ async fn handle_next_request(
    let request: RequestMsg;
    match server_query_rx.lock() {
        Err(e) => {
            // Not expecting this to happen, but just in case lets try to keep going
            warn!("Failed to lock receive queue, trying again: {:?}", e);
            return true;
            return true; // Continue thread
        }
        Ok(server_query_rx_lock) => {
            // Grab a request from the queue, then release the lock


@@ 430,13 436,11 @@ async fn handle_next_request(
                request = got_request;
            } else {
                info!("Exiting handler thread: request queue has closed.");
                return false;
                return false; // EXIT THREAD
            }
        }
    }

    let span = tracing::info_span!("handle-query");
    let _enter = span.enter();
    match request.data {
        RequestData::Udp(buf) => {
            listen_udp::handle_udp_request(lookup, request.src, buf, udp_sock)


@@ 455,5 459,5 @@ async fn handle_next_request(
            tcp_buf.resize(MAX_TCP_BYTES as usize, 0);
        }
    }
    false
    true // Continue thread
}