M benches/server.rs => benches/server.rs +271 -93
@@ 1,19 1,21 @@
-#![deny(warnings, rust_2018_idioms)]
-use std::net::{IpAddr, Ipv4Addr, SocketAddr, ToSocketAddrs, UdpSocket};
-use std::sync::Arc;
+use std::net::{IpAddr, Ipv4Addr, SocketAddr, ToSocketAddrs, UdpSocket as SyncUdpSocket};
use std::thread::{self, JoinHandle};
use std::time::Duration;
-use anyhow::{bail, Context, Result};
+use anyhow::{anyhow, bail, Context, Result};
use bytes::BytesMut;
use criterion::{criterion_group, criterion_main, BatchSize, Criterion, Throughput};
-use crossbeam_queue::ArrayQueue;
+use futures::{future, task};
use lazy_static::lazy_static;
use packed_struct::prelude::*;
use tempfile;
+use tokio::net;
use tokio::runtime::Runtime;
-use tracing::debug;
+use tokio::sync::oneshot;
+use tokio::time;
+use tracing::{debug, error};
use kapiti::codec::{domain_name, encoder::DNSMessageEncoder, message};
use kapiti::config::Config;
@@ 27,8 29,12 @@ const LOCAL_EPHEMERAL_ENDPOINT: &str = "";
const STUB_QUERY_IP: IpAddr = IpAddr::V4(Ipv4Addr::new(1, 2, 4, 8));
const STUB_QUERY_NAME: &str = "kapiti.io.";
+const UPSTREAM_LATENCY_FAST: Duration = Duration::from_millis(1);
+const UPSTREAM_LATENCY_SLOW: Duration = Duration::from_millis(25);
lazy_static! {
- static ref STUB_REQUEST: BytesMut = write_stub_request().expect("Failed to create stub request");
+ static ref STUB_REQUEST: BytesMut =
+ write_stub_request().expect("Failed to create stub request");
fn build_stub_question<'a>() -> flatbuffers::FlatBufferBuilder<'a> {
@@ 46,7 52,8 @@ fn build_stub_question<'a>() -> flatbuffers::FlatBufferBuilder<'a> {
/// Writes a DNS response/answer into the provided buffer
fn write_stub_response() -> Result<BytesMut> {
let question_fbb = build_stub_question();
- let question: Question<'_> = flatbuffers::get_root::<Question<'_>>(question_fbb.finished_data());
+ let question: Question<'_> =
+ flatbuffers::get_root::<Question<'_>>(question_fbb.finished_data());
let mut buf = BytesMut::with_capacity(4096);
@@ 56,7 63,7 @@ fn write_stub_response() -> Result<BytesMut> {
- &mut buf
+ &mut buf,
@@ 89,155 96,326 @@ fn write_stub_request() -> Result<BytesMut> {
let question_fbb = build_stub_question();
- let question: Question<'_> = flatbuffers::get_root::<Question<'_>>(question_fbb.finished_data());
+ let question: Question<'_> =
+ flatbuffers::get_root::<Question<'_>>(question_fbb.finished_data());
let mut ptr_offsets = domain_name::LabelOffsets::new();
message::write_question(&question, &mut buf, &mut ptr_offsets)?;
-fn run_udp_upstream(udp_sock: UdpSocket, stop: Arc<ArrayQueue<()>>) -> Result<()> {
- let mut request_buffer = BytesMut::with_capacity(4096);
- let mut response_buffer = write_stub_response()?;
+struct ResponseInfo {
+ dest: SocketAddr,
+ request0: u8,
+ request1: u8,
- loop {
- // Ensure that the buffer has a SIZE suitable for socket.recv_from().
- // If we just leave it with the CAPACITY then it drops data.
- request_buffer.resize(request_buffer.capacity(), 0);
- debug!("Harness reading...");
- match udp_sock.recv_from(&mut request_buffer) {
- Ok((recvsize, recvfrom)) => {
- debug!("Harness got {} bytes from {:?}", recvsize, recvfrom);
- // Ensure that the response has a matching request ID (first two bytes)
- if recvsize < 2 {
- bail!("Expected request to have at least 2 bytes, but got {}", recvsize);
- }
- response_buffer[0] = request_buffer[0];
- response_buffer[1] = request_buffer[1];
+enum UpstreamEvent {
+ GotRequest(ResponseInfo),
+ ResponseReady(ResponseInfo),
+ Stop,
- request_buffer.clear();
+/// Hack to get around borrow checker issues. Ideally we'd have a Stream with these as members but then the compiler breaks:
+/// error[E0502]: cannot borrow `self` as immutable because it is also borrowed as mutable
+/// --> benches/server.rs:156:41
+/// |
+/// 155 | let buf = &mut self.request_buffer;
+/// | ---- mutable borrow occurs here
+/// 156 | if let task::Poll::Ready(rdy) = self.udp_sock.poll_recv_from(cx, buf) {
+/// | ^^^^ --- mutable borrow later used here
+/// | |
+/// | immutable borrow occurs here
+struct UpstreamEventMembers<'a> {
+ queue: &'a mut time::delay_queue::DelayQueue<ResponseInfo>,
+ udp_sock: &'a net::UdpSocket,
+ request_buffer: &'a mut [u8; 4096],
+ stop: &'a mut oneshot::Receiver<()>,
- debug!("Harness replying...");
- let sendsize = udp_sock.send_to(&mut response_buffer, recvfrom)?;
- debug!("Harness sent {} bytes to {:?}", sendsize, recvfrom);
+struct UpstreamEventFuture<'a> {
+ m: UpstreamEventMembers<'a>,
+impl<'a> UpstreamEventFuture<'a> {
+ fn new(m: UpstreamEventMembers<'a>) -> UpstreamEventFuture<'a> {
+ UpstreamEventFuture {
+ m
+ }
+ }
+impl<'a> future::Future for UpstreamEventFuture<'a> {
+ type Output = Result<UpstreamEvent>;
+ fn poll(mut self: std::pin::Pin<&mut Self>, cx: &mut task::Context) -> task::Poll<Self::Output> {
+ // Check if the delay queue has something ready to send
+ if let task::Poll::Ready(Some(rdy)) = self.m.queue.poll_expired(cx) {
+ let msg = rdy
+ .map(|msg| UpstreamEvent::ResponseReady(msg.into_inner()))
+ .with_context(|| "Failed to read from queue");
+ return task::Poll::Ready(msg);
+ }
+ // Check if the socket has something to receive
+ if let task::Poll::Ready(rdy) = self.m.udp_sock.poll_recv_from(cx, self.m.request_buffer) {
+ match rdy {
+ Ok((recvsize, dest)) => {
+ // Ensure that the response has a matching request ID (first two bytes)
+ if recvsize < 2 {
+ return task::Poll::Ready(Err(anyhow!(
+ "Expected request to have at least 2 bytes, but got {}",
+ recvsize
+ )));
+ }
+ let msg = ResponseInfo {
+ dest,
+ request0: self.m.request_buffer[0],
+ request1: self.m.request_buffer[1],
+ };
+ return task::Poll::Ready(Ok(UpstreamEvent::GotRequest(msg)));
+ },
+ Err(e) => {
+ return task::Poll::Ready(Err(e).with_context(|| "Failed to read from socket"));
+ },
+ }
+ }
+ // Check if we should be stopping
+ match self.m.stop.try_recv() {
+ Ok(()) => {
+ // Finish all requests/responses, THEN stop
+ debug!("Stopping upstream");
+ assert!(self.m.queue.is_empty(), "Should have had an empty queue before getting stop signal");
+ return task::Poll::Ready(Ok(UpstreamEvent::Stop));
- Err(_e) => {
- // Might be a timeout where upstream is waiting for us to exit, or might not be
- if stop.pop().is_ok() {
- debug!("Stopping harness thread");
+ Err(oneshot::error::TryRecvError::Empty) => {},
+ Err(oneshot::error::TryRecvError::Closed) => error!("stop channel was closed"),
+ }
+ task::Poll::Pending
+ }
+async fn run_udp_upstream(mut udp_sock: net::UdpSocket, latency: Duration, mut stop: oneshot::Receiver<()>) -> Result<()> {
+ let mut response_buffer = write_stub_response()
+ .expect("Failed to construct response buffer");
+ tokio::spawn(async move {
+ let mut request_buffer = [0; 4096];
+ let mut queue = time::delay_queue::DelayQueue::new();
+ loop {
+ let members = UpstreamEventMembers {
+ queue: &mut queue,
+ udp_sock: &udp_sock,
+ request_buffer: &mut request_buffer,
+ stop: &mut stop,
+ };
+ // Timeout needed in order to shut down cleanly.
+ // Otherwise it just gets stuck despite "stop" having a message for us.
+ match time::timeout(
+ Duration::from_millis(1000),
+ UpstreamEventFuture::new(members),
+ ).await {
+ Ok(Ok(UpstreamEvent::GotRequest(response_info))) => {
+ debug!("Harness got request from {:?}", response_info.dest);
+ queue.insert(response_info, latency);
+ },
+ Ok(Ok(UpstreamEvent::ResponseReady(response_info))) => {
+ // Send the response we got back to the original requestor.
+ response_buffer[0] = response_info.request0;
+ response_buffer[1] = response_info.request1;
+ // Shouldn't time out but just in case...
+ let sendsize = time::timeout(
+ Duration::from_millis(1000),
+ udp_sock.send_to(&mut response_buffer, &response_info.dest),
+ ).await
+ .expect("Timed out sending DNS response to oneshot output")
+ .expect("Failed to send DNS response to oneshot output");
+ debug!("Harness sent {} bytes to {:?}", sendsize, response_info.dest);
+ },
+ Ok(Ok(UpstreamEvent::Stop)) => {
+ debug!("Harness got stop signal, exiting");
return Ok(());
+ },
+ Ok(Err(e)) => {
+ error!("Error while waiting for upstream event: {}", e);
+ }
+ Err(e) => {
+ debug!("Timed out while waiting for upstream event: {}", e);
- }
+ }).await?
-fn start_udp_upstream(stop: Arc<ArrayQueue<()>>) -> Result<(SocketAddr, JoinHandle<Result<()>>)> {
+fn start_udp_upstream(
+ latency: Duration,
+ stop: oneshot::Receiver<()>,
+) -> Result<(SocketAddr, JoinHandle<Result<()>>)> {
let listen_addr_ephemeral = LOCAL_EPHEMERAL_ENDPOINT
.with_context(|| "Invalid listen address")?;
- let upstream_sock = UdpSocket::bind(listen_addr_ephemeral)
+ let mut runtime = Runtime::new()?;
+ let upstream_sock = runtime.block_on(net::UdpSocket::bind(listen_addr_ephemeral))
.with_context(|| format!("Failed to listen on {}", listen_addr_ephemeral))?;
- upstream_sock.set_read_timeout(Some(Duration::from_millis(1000)))?;
- upstream_sock.set_write_timeout(Some(Duration::from_millis(1000)))?;
let listen_addr_actual = upstream_sock.local_addr()?;
debug!("Harness running at {:?}", listen_addr_actual);
- Ok((listen_addr_actual, thread::spawn(move || {
- run_udp_upstream(upstream_sock, stop)
- .with_context(|| format!("run_udp_upstream failed"))
- })))
+ Ok((
+ listen_addr_actual,
+ thread::spawn(move || {
+ let result = runtime.block_on(run_udp_upstream(upstream_sock, latency, stop))
+ .with_context(|| format!("run_udp_upstream failed"));
+ debug!("run_udp_upstream thread exited");
+ return result;
+ }),
+ ))
struct RunInputs {
- client_sock: UdpSocket,
+ client_sock: SyncUdpSocket,
response_buffer: BytesMut,
-fn setup_udp_requests() -> Result<RunInputs> {
- let client_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127,0,0,1)), 0);
- let client_sock = UdpSocket::bind(client_addr)?;
- client_sock.set_read_timeout(Some(Duration::from_millis(5000)))?;
- client_sock.set_write_timeout(Some(Duration::from_millis(5000)))?;
- let response_buffer = BytesMut::with_capacity(4096);
- Ok(RunInputs {
- client_sock,
- response_buffer,
- })
+fn setup_udp_requests(count: u64) -> Result<Vec<RunInputs>> {
+ let client_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0);
+ let mut inputs_vec = Vec::new();
+ for _i in 0..count {
+ let client_sock = SyncUdpSocket::bind(client_addr)?;
+ client_sock.set_read_timeout(Some(Duration::from_millis(5000)))?;
+ client_sock.set_write_timeout(Some(Duration::from_millis(5000)))?;
+ let mut response_buffer = BytesMut::with_capacity(4096);
+ // Ensure that the buffer has a SIZE suitable for socket.recv_from().
+ // If we just leave it with the CAPACITY then it drops data.
+ response_buffer
+ .resize(response_buffer.capacity(), 0);
+ inputs_vec.push(RunInputs {
+ client_sock,
+ response_buffer,
+ });
+ }
+ Ok(inputs_vec)
/// Sets up and runs `count` requests, waiting for a response after each request.
/// Reinitializes every time so that `move` will work.
-fn run_udp_requests(mut inputs: RunInputs, kapiti_udp_endpoint: SocketAddr, count: u64) -> Result<()> {
- for _i in 0..count {
- debug!("Harness sending...");
- let sendsize = inputs.client_sock.send_to(&STUB_REQUEST[..], kapiti_udp_endpoint).with_context(|| "send_to failed")?;
- debug!("Harness sent {} bytes to {:?}", sendsize, kapiti_udp_endpoint);
- // Ensure that the buffer has a SIZE suitable for socket.recv_from().
- // If we just leave it with the CAPACITY then it drops data.
- inputs.response_buffer.resize(inputs.response_buffer.capacity(), 0);
- debug!("Client reading...");
- let (recvsize, recvfrom) = inputs.client_sock.recv_from(&mut inputs.response_buffer).with_context(|| "recv_from failed: no response from kapiti")?;
- debug!("Client got {} bytes from {:?}", recvsize, recvfrom);
- if inputs.response_buffer[0] != STUB_REQUEST[0] || inputs.response_buffer[1] != STUB_REQUEST[1] {
- bail!("Response doesn't have expected request ID:\n- request: {:?}\n-response: {:?}", &STUB_REQUEST[..], inputs.response_buffer);
+fn run_udp_requests(
+ inputs_vec: Vec<RunInputs>,
+ kapiti_udp_endpoint: SocketAddr,
+) -> Result<()> {
+ // Send requests for each entry in the batch
+ for inputs in &inputs_vec {
+ debug!("Bench client sending...");
+ let sendsize = inputs
+ .client_sock
+ .send_to(&STUB_REQUEST[..], kapiti_udp_endpoint)
+ .with_context(|| "send_to failed")?;
+ debug!(
+ "Bench client sent {} bytes to {:?}",
+ sendsize, kapiti_udp_endpoint
+ );
+ }
+ // Wait for responses to come back
+ for mut inputs in inputs_vec {
+ let (recvsize, recvfrom) = inputs
+ .client_sock
+ .recv_from(&mut inputs.response_buffer)
+ .with_context(|| "recv_from failed: no response from kapiti")?;
+ debug!("Bench client got {} bytes from {:?}", recvsize, recvfrom);
+ if inputs.response_buffer[0] != STUB_REQUEST[0]
+ || inputs.response_buffer[1] != STUB_REQUEST[1]
+ {
+ bail!(
+ "Response doesn't have expected request ID:\n- request: {:?}\n-response: {:?}",
+ inputs.response_buffer
+ );
/// Requests coming in over UDP, upstream endpoint over UDP
-fn run_udp_udp_test(c: &mut Criterion) -> Result<()> {
- logging::init_logging();
+fn run_udp_udp_test(
+ c: &mut Criterion,
+ name: &str,
+ upstream_latency: Duration,
+ samples: usize,
+) -> Result<()> {
let tmpstorage = tempfile::tempdir()?;
- // Use queue to notify threads to stop
- let stop = Arc::new(ArrayQueue::new(2));
+ // Use channels to notify threads to stop
+ let (stop_upstream_tx, stop_upstream_rx) = oneshot::channel();
+ let (stop_kapiti_tx, stop_kapiti_rx) = oneshot::channel();
// Start upstream harness
- let (upstream_addr, upstream_join_handle) = start_udp_upstream(stop.clone())?;
- let config = Config::new_for_test(tmpstorage.path().to_str().expect("invalid temp storage path"), upstream_addr.to_string());
+ let (upstream_addr, upstream_join_handle) =
+ start_udp_upstream(upstream_latency, stop_upstream_rx)?;
+ let config = Config::new_for_test(
+ tmpstorage
+ .path()
+ .to_str()
+ .expect("invalid temp storage path"),
+ upstream_addr.to_string(),
+ );
// Start kapiti server
let mut runtime = Runtime::new()?;
let mut runner = runtime.block_on(Runner::new("benchmark".to_string(), config))?;
- runner.set_stop(stop.clone());
- let kapiti_udp_endpoint = runner.get_udp_endpoint()?;
- let kapiti_join_handle = thread::spawn(move || {
- runtime.block_on(runner.run())
- });
+ runner.set_stop(stop_kapiti_rx);
+ let kapiti_udp_endpoint = runner.get_udp_endpoint();
+ let kapiti_join_handle = thread::spawn(move || runtime.block_on(runner.run()));
// Run benchmark: See how quickly we can get responses from kapiti
let run_count: u64 = 30;
let mut group = c.benchmark_group("server");
- group.sample_size(50);
- group.bench_function("udp_udp", |b| b.iter_batched(
- move || setup_udp_requests().expect("setup failed"),
- move |inputs| run_udp_requests(inputs, kapiti_udp_endpoint, run_count).expect("client run failed"),
- BatchSize::PerIteration
- ));
+ group.sample_size(samples);
+ group.bench_function(name, |b| {
+ b.iter_batched(
+ move || setup_udp_requests(run_count).expect("setup failed"),
+ move |inputs_vec| {
+ run_udp_requests(inputs_vec, kapiti_udp_endpoint).expect("client run failed")
+ },
+ BatchSize::LargeInput,
+ )
+ });
// Send shutdown messages to be consumed by the kapiti thread and the upstream thread (not necessarily in that order)
- stop.push(()).expect("unable to fit first message on stop queue");
- stop.push(()).expect("unable to fit second message on stop queue");
+ stop_upstream_tx
+ .send(())
+ .expect("unable to notify upstream to stop");
+ stop_kapiti_tx
+ .send(())
+ .expect("unable to notify kapiti to stop");
// Wait for kapiti thread to get the message
debug!("Waiting for kapiti thread to exit...");
- kapiti_join_handle.join().expect("failed to join kapiti thread")?;
+ kapiti_join_handle
+ .join()
+ .expect("failed to join kapiti thread")?;
// Wait for upstream thread to get the message
debug!("Waiting for upstream thread to exit...");
- upstream_join_handle.join().expect("failed to join upstream harness thread")?;
+ upstream_join_handle
+ .join()
+ .expect("failed to join upstream harness thread")?;
-fn udp_udp(c: &mut Criterion) {
- run_udp_udp_test(c).expect("udp_udp test failed");
+fn udp_udp_fast(c: &mut Criterion) {
+ logging::init_logging(); // init once in first bench, then leave alone
+ run_udp_udp_test(c, "fast", UPSTREAM_LATENCY_FAST, 50).expect("udp_udp test failed");
+fn udp_udp_slow(c: &mut Criterion) {
+ run_udp_udp_test(c, "slow", UPSTREAM_LATENCY_SLOW, 10).expect("udp_udp_slow test failed");
-criterion_group!(benches, udp_udp);
\ No newline at end of file
+criterion_group!(benches, udp_udp_fast, udp_udp_slow);
M examples/update_fbs.rs => examples/update_fbs.rs +1 -1
@@ 1,4 1,4 @@
-#![deny(warnings, rust_2018_idioms)]
// Updates flatbuffers: fbs/*.fbs and src/fbs/*.rs. Executed via update-fbs.sh.
use std::fs::File;
M src/client/https.rs => src/client/https.rs +1 -1
@@ 1,4 1,4 @@
-#![deny(warnings, rust_2018_idioms)]
use std::convert::TryFrom;
use std::io::{self, Write};
M src/client/hyper.rs => src/client/hyper.rs +1 -1
@@ 1,4 1,4 @@
-#![deny(warnings, rust_2018_idioms)]
use std::future::Future;
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
M src/client/mod.rs => src/client/mod.rs +1 -1
@@ 1,4 1,4 @@
-#![deny(warnings, rust_2018_idioms)]
use anyhow::Result;
use async_trait::async_trait;
M src/client/tcp.rs => src/client/tcp.rs +1 -1
@@ 1,4 1,4 @@
-#![deny(warnings, rust_2018_idioms)]
use std::convert::TryFrom;
use std::net::SocketAddr;
M src/client/udp.rs => src/client/udp.rs +1 -1
@@ 1,4 1,4 @@
-#![deny(warnings, rust_2018_idioms)]
use std::net::{SocketAddr, ToSocketAddrs};
use std::time::Duration;
M src/codec/character_string.rs => src/codec/character_string.rs +1 -1
@@ 1,4 1,4 @@
-#![deny(warnings, rust_2018_idioms)]
/// Implementation of `character-string`s as defined by RFC1035 section 3.3
use anyhow::{bail, Result};
M src/codec/decoder.rs => src/codec/decoder.rs +1 -1
@@ 1,4 1,4 @@
-#![deny(warnings, rust_2018_idioms)]
use anyhow::{bail, Context, Result};
M src/codec/display.rs => src/codec/display.rs +1 -1
@@ 1,4 1,4 @@
-#![deny(warnings, rust_2018_idioms)]
use crate::fbs::dns_enums_conv;
use crate::fbs::dns_enums_generated::ResourceType;
M src/codec/domain_name.rs => src/codec/domain_name.rs +1 -1
@@ 1,4 1,4 @@
-#![deny(warnings, rust_2018_idioms)]
/// Implementation of `domain-name`s as defined by RFC1035 sections 3.1 and 3.3
use anyhow::{bail, Context, Result};
M src/codec/encoder.rs => src/codec/encoder.rs +1 -1
@@ 1,4 1,4 @@
-#![deny(warnings, rust_2018_idioms)]
use std::net::IpAddr;
M src/codec/message.rs => src/codec/message.rs +1 -1
@@ 1,4 1,4 @@
-#![deny(warnings, rust_2018_idioms)]
use anyhow::{bail, Context, Result};
use std::convert::TryFrom;
M src/codec/mod.rs => src/codec/mod.rs +1 -1
@@ 1,4 1,4 @@
-#![deny(warnings, rust_2018_idioms)]
/// Deserializes DNS messages, returning parsed `Message` objects.
pub mod decoder;
M src/codec/rdata.rs => src/codec/rdata.rs +1 -1
@@ 1,4 1,4 @@
-#![deny(warnings, rust_2018_idioms)]
use anyhow::{bail, Context, Result};
use std::convert::TryFrom;
M src/config.rs => src/config.rs +1 -1
@@ 1,4 1,4 @@
-#![deny(warnings, rust_2018_idioms)]
use std::env;
use std::fmt;
M src/filter/downloader.rs => src/filter/downloader.rs +1 -1
@@ 1,4 1,4 @@
-#![deny(warnings, rust_2018_idioms)]
use std::convert::TryFrom;
use std::fs::{self, File};
M src/filter/filter.rs => src/filter/filter.rs +1 -1
@@ 1,4 1,4 @@
-#![deny(warnings, rust_2018_idioms)]
use std::convert::TryFrom;
use std::path::{Path, PathBuf};
M src/filter/mod.rs => src/filter/mod.rs +1 -1
@@ 1,4 1,4 @@
-#![deny(warnings, rust_2018_idioms)]
/// Downloads blocklists/host files to disk
pub mod downloader;
M src/filter/path.rs => src/filter/path.rs +1 -1
@@ 1,4 1,4 @@
-#![deny(warnings, rust_2018_idioms)]
use std::ffi::OsString;
use std::path::Path;
M src/filter/reader.rs => src/filter/reader.rs +1 -1
@@ 1,4 1,4 @@
-#![deny(warnings, rust_2018_idioms)]
use anyhow::{bail, Context, Result};
use std::collections::HashMap;
M src/http.rs => src/http.rs +1 -1
@@ 1,4 1,4 @@
-#![deny(warnings, rust_2018_idioms)]
use std::convert::TryFrom;
use std::io::{self, Write};
M src/logging.rs => src/logging.rs +1 -1
@@ 1,4 1,4 @@
-#![deny(warnings, rust_2018_idioms)]
use tracing;
use tracing_subscriber::layer::SubscriberExt;
M src/main.rs => src/main.rs +1 -1
@@ 1,4 1,4 @@
-#![deny(warnings, rust_2018_idioms)]
use std::env;
use std::ffi::CString;
M src/runner.rs => src/runner.rs +21 -12
@@ 1,16 1,15 @@
-#![deny(warnings, rust_2018_idioms)]
use std::fs::create_dir;
use std::net::{SocketAddr, ToSocketAddrs};
use std::path::Path;
-use std::sync::Arc;
use std::time::Duration;
use anyhow::{bail, Context, Result};
use bytes::BytesMut;
-use crossbeam_queue::ArrayQueue;
use redis::{self, IntoConnectionInfo};
use tokio::net::UdpSocket;
+use tokio::sync::oneshot;
use tokio::time;
use tracing::{self, debug, info, trace, warn};
@@ 24,7 23,7 @@ pub struct Runner {
udp_sock: UdpSocket,
config_path: String,
config: config::Config,
- stop: Option<Arc<ArrayQueue<()>>>,
+ stop: Option<oneshot::Receiver<()>>,
impl Runner {
@@ 50,14 49,14 @@ impl Runner {
/// Configures the runner with a queue that will tell the runner when to stop processing requests.
/// For use in tests where we want to cleanly shut down the runner thread.
- pub fn set_stop(self: &mut Runner, stop: Arc<ArrayQueue<()>>) {
+ pub fn set_stop(self: &mut Runner, stop: oneshot::Receiver<()>) {
self.stop = Some(stop);
/// Returns the listen endpoint for the UDP socket.
/// This is for testing cases, where an ephemeral listen port is being used.
- pub fn get_udp_endpoint(self: &Runner) -> Result<SocketAddr> {
- return self.udp_sock.local_addr().with_context(|| "Couldn't get local UDP socket address");
+ pub fn get_udp_endpoint(self: &Runner) -> SocketAddr {
+ return self.udp_sock.local_addr().expect("Couldn't get local UDP socket address");
/// Runs the server. This should run until one of the following occurs:
@@ 176,7 175,7 @@ impl Runner {
let request_source: SocketAddr;
- if let Some(stop) = &self.stop {
+ if let Some(stop) = &mut self.stop {
// Shutdown support enabled, use a timeout on reads to check periodically for a shutdown signal
loop {
if let Ok(recvresult) = time::timeout(
@@ 191,10 190,20 @@ impl Runner {
// Shorten to actual size received (doesnt affect malloc)
- } else if stop.pop().is_ok() {
- // Got a timeout, and it looks like we've been told to stop.
- debug!("Stopping runner thread");
- return Ok(());
+ } else {
+ match stop.try_recv() {
+ Ok(()) => {
+ // Got a timeout, and it looks like we've been told to stop.
+ debug!("Stopping runner thread");
+ return Ok(());
+ }
+ Err(oneshot::error::TryRecvError::Empty) => {
+ // Not stopping yet
+ }
+ Err(oneshot::error::TryRecvError::Closed) => {
+ warn!("Upstream stop channel was closed early");
+ }
+ }
} else {
M src/server.rs => src/server.rs +1 -1
@@ 1,4 1,4 @@
-#![deny(warnings, rust_2018_idioms)]
use std::convert::TryFrom;
use std::net::IpAddr;
M tests/codec_property.rs => tests/codec_property.rs +1 -1
@@ 1,4 1,4 @@
-#![deny(warnings, rust_2018_idioms)]
use bytes::BytesMut;
use proptest::prelude::*;