#![deny(warnings)]
use std::net::{IpAddr, Ipv4Addr, SocketAddr, ToSocketAddrs, UdpSocket as SyncUdpSocket};
use std::thread::{self, JoinHandle};
use std::time::Duration;
use anyhow::{anyhow, bail, Context, Result};
use bytes::BytesMut;
use criterion::{criterion_group, criterion_main, BatchSize, Criterion, Throughput};
use futures::{future, task};
use lazy_static::lazy_static;
use packed_struct::prelude::*;
use tempfile;
use tokio::net;
use tokio::runtime::Runtime;
use tokio::sync::oneshot;
use tokio::time;
use tracing::{debug, error};
use kapiti::codec::{domain_name, encoder::DNSMessageEncoder, message};
use kapiti::config::Config;
use kapiti::fbs::dns_enums_generated::{ResourceClass, ResourceType, ResponseCode};
use kapiti::fbs::dns_message_generated::{Question, QuestionArgs};
use kapiti::logging;
use kapiti::runner::Runner;
const LOCAL_EPHEMERAL_ENDPOINT: &str = "127.0.0.1:0";
const STUB_QUERY_IP: IpAddr = IpAddr::V4(Ipv4Addr::new(1, 2, 4, 8));
const STUB_QUERY_NAME: &str = "kapiti.io.";
const UPSTREAM_LATENCY_FAST: Duration = Duration::from_millis(1);
const UPSTREAM_LATENCY_SLOW: Duration = Duration::from_millis(25);
lazy_static! {
static ref STUB_REQUEST: BytesMut =
write_stub_request().expect("Failed to create stub request");
}
fn build_stub_question<'a>() -> flatbuffers::FlatBufferBuilder<'a> {
let mut fbb = flatbuffers::FlatBufferBuilder::new_with_capacity(1024);
let question_args = QuestionArgs {
name: Some(fbb.create_string(STUB_QUERY_NAME)),
resource_type: ResourceType::TYPE_A as u16,
resource_class: ResourceClass::CLASS_INTERNET as u16,
};
let question_offset = Question::create(&mut fbb, &question_args);
fbb.finish_minimal(question_offset);
fbb
}
/// Writes a DNS response/answer into the provided buffer
fn write_stub_response() -> Result<BytesMut> {
let question_fbb = build_stub_question();
let question: Question<'_> =
flatbuffers::get_root::<Question<'_>>(question_fbb.finished_data());
let mut buf = BytesMut::with_capacity(4096);
DNSMessageEncoder::new().encode_local_response(
ResponseCode::RESPONSE_NOERROR,
0,
&question,
None,
Some(STUB_QUERY_IP),
None,
&mut buf,
)?;
Ok(buf)
}
/// Writes a DNS request/question into the provided buffer
fn write_stub_request() -> Result<BytesMut> {
let mut buf = BytesMut::with_capacity(4096);
message::write_header_bits(
message::HeaderBits {
id: 12345,
is_response: true,
op_code: Integer::from(0 /*QUERY*/),
authoritative: false,
truncated: false,
recursion_desired: true,
recursion_available: true,
reserved_9: false,
authentic_data: false,
checking_disabled: false,
response_code: Integer::from(ResponseCode::RESPONSE_NOERROR as u8),
question_count: 1,
answer_count: 0,
authority_count: 0,
additional_count: 0,
},
&mut buf,
)?;
let question_fbb = build_stub_question();
let question: Question<'_> =
flatbuffers::get_root::<Question<'_>>(question_fbb.finished_data());
let mut ptr_offsets = domain_name::LabelOffsets::new();
message::write_question(&question, &mut buf, &mut ptr_offsets)?;
Ok(buf)
}
#[derive(Debug)]
struct ResponseInfo {
dest: SocketAddr,
request0: u8,
request1: u8,
}
enum UpstreamEvent {
GotRequest(ResponseInfo),
ResponseReady(ResponseInfo),
Stop,
}
/// Hack to get around borrow checker issues. Ideally we'd have a Stream with these as members but then the compiler breaks:
/// error[E0502]: cannot borrow `self` as immutable because it is also borrowed as mutable
/// --> benches/server.rs:156:41
/// |
/// 155 | let buf = &mut self.request_buffer;
/// | ---- mutable borrow occurs here
/// 156 | if let task::Poll::Ready(rdy) = self.udp_sock.poll_recv_from(cx, buf) {
/// | ^^^^ --- mutable borrow later used here
/// | |
/// | immutable borrow occurs here
struct UpstreamEventMembers<'a> {
queue: &'a mut time::delay_queue::DelayQueue<ResponseInfo>,
udp_sock: &'a net::UdpSocket,
request_buffer: &'a mut [u8; 4096],
stop: &'a mut oneshot::Receiver<()>,
}
struct UpstreamEventFuture<'a> {
m: UpstreamEventMembers<'a>,
}
impl<'a> UpstreamEventFuture<'a> {
fn new(m: UpstreamEventMembers<'a>) -> UpstreamEventFuture<'a> {
UpstreamEventFuture {
m
}
}
}
impl<'a> future::Future for UpstreamEventFuture<'a> {
type Output = Result<UpstreamEvent>;
fn poll(mut self: std::pin::Pin<&mut Self>, cx: &mut task::Context) -> task::Poll<Self::Output> {
// Check if the delay queue has something ready to send
if let task::Poll::Ready(Some(rdy)) = self.m.queue.poll_expired(cx) {
let msg = rdy
.map(|msg| UpstreamEvent::ResponseReady(msg.into_inner()))
.with_context(|| "Failed to read from queue");
return task::Poll::Ready(msg);
}
// Check if the socket has something to receive
if let task::Poll::Ready(rdy) = self.m.udp_sock.poll_recv_from(cx, self.m.request_buffer) {
match rdy {
Ok((recvsize, dest)) => {
// Ensure that the response has a matching request ID (first two bytes)
if recvsize < 2 {
return task::Poll::Ready(Err(anyhow!(
"Expected request to have at least 2 bytes, but got {}",
recvsize
)));
}
let msg = ResponseInfo {
dest,
request0: self.m.request_buffer[0],
request1: self.m.request_buffer[1],
};
return task::Poll::Ready(Ok(UpstreamEvent::GotRequest(msg)));
},
Err(e) => {
return task::Poll::Ready(Err(e).with_context(|| "Failed to read from socket"));
},
}
}
// Check if we should be stopping
match self.m.stop.try_recv() {
Ok(()) => {
// Finish all requests/responses, THEN stop
debug!("Stopping upstream");
assert!(self.m.queue.is_empty(), "Should have had an empty queue before getting stop signal");
return task::Poll::Ready(Ok(UpstreamEvent::Stop));
},
Err(oneshot::error::TryRecvError::Empty) => {},
Err(oneshot::error::TryRecvError::Closed) => error!("stop channel was closed"),
}
task::Poll::Pending
}
}
async fn run_udp_upstream(mut udp_sock: net::UdpSocket, latency: Duration, mut stop: oneshot::Receiver<()>) -> Result<()> {
let mut response_buffer = write_stub_response()
.expect("Failed to construct response buffer");
tokio::spawn(async move {
let mut request_buffer = [0; 4096];
let mut queue = time::delay_queue::DelayQueue::new();
loop {
let members = UpstreamEventMembers {
queue: &mut queue,
udp_sock: &udp_sock,
request_buffer: &mut request_buffer,
stop: &mut stop,
};
// Timeout needed in order to shut down cleanly.
// Otherwise it just gets stuck despite "stop" having a message for us.
match time::timeout(
Duration::from_millis(1000),
UpstreamEventFuture::new(members),
).await {
Ok(Ok(UpstreamEvent::GotRequest(response_info))) => {
debug!("Harness got request from {:?}", response_info.dest);
queue.insert(response_info, latency);
},
Ok(Ok(UpstreamEvent::ResponseReady(response_info))) => {
// Send the response we got back to the original requestor.
response_buffer[0] = response_info.request0;
response_buffer[1] = response_info.request1;
// Shouldn't time out but just in case...
let sendsize = time::timeout(
Duration::from_millis(1000),
udp_sock.send_to(&mut response_buffer, &response_info.dest),
).await
.expect("Timed out sending DNS response to oneshot output")
.expect("Failed to send DNS response to oneshot output");
debug!("Harness sent {} bytes to {:?}", sendsize, response_info.dest);
},
Ok(Ok(UpstreamEvent::Stop)) => {
debug!("Harness got stop signal, exiting");
return Ok(());
},
Ok(Err(e)) => {
error!("Error while waiting for upstream event: {}", e);
}
Err(e) => {
debug!("Timed out while waiting for upstream event: {}", e);
}
}
}
}).await?
}
fn start_udp_upstream(
latency: Duration,
stop: oneshot::Receiver<()>,
) -> Result<(SocketAddr, JoinHandle<Result<()>>)> {
let listen_addr_ephemeral = LOCAL_EPHEMERAL_ENDPOINT
.to_socket_addrs()?
.next()
.with_context(|| "Invalid listen address")?;
let mut runtime = Runtime::new()?;
let upstream_sock = runtime.block_on(net::UdpSocket::bind(listen_addr_ephemeral))
.with_context(|| format!("Failed to listen on {}", listen_addr_ephemeral))?;
let listen_addr_actual = upstream_sock.local_addr()?;
debug!("Harness running at {:?}", listen_addr_actual);
Ok((
listen_addr_actual,
thread::spawn(move || {
let result = runtime.block_on(run_udp_upstream(upstream_sock, latency, stop))
.with_context(|| format!("run_udp_upstream failed"));
debug!("run_udp_upstream thread exited");
return result;
}),
))
}
struct RunInputs {
client_sock: SyncUdpSocket,
response_buffer: BytesMut,
}
fn setup_udp_requests(count: u64) -> Result<Vec<RunInputs>> {
let client_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0);
let mut inputs_vec = Vec::new();
for _i in 0..count {
let client_sock = SyncUdpSocket::bind(client_addr)?;
client_sock.set_read_timeout(Some(Duration::from_millis(5000)))?;
client_sock.set_write_timeout(Some(Duration::from_millis(5000)))?;
let mut response_buffer = BytesMut::with_capacity(4096);
// Ensure that the buffer has a SIZE suitable for socket.recv_from().
// If we just leave it with the CAPACITY then it drops data.
response_buffer
.resize(response_buffer.capacity(), 0);
inputs_vec.push(RunInputs {
client_sock,
response_buffer,
});
}
Ok(inputs_vec)
}
/// Sets up and runs `count` requests, waiting for a response after each request.
/// Reinitializes every time so that `move` will work.
fn run_udp_requests(
inputs_vec: Vec<RunInputs>,
kapiti_udp_endpoint: SocketAddr,
) -> Result<()> {
// Send requests for each entry in the batch
for inputs in &inputs_vec {
debug!("Bench client sending...");
let sendsize = inputs
.client_sock
.send_to(&STUB_REQUEST[..], kapiti_udp_endpoint)
.with_context(|| "send_to failed")?;
debug!(
"Bench client sent {} bytes to {:?}",
sendsize, kapiti_udp_endpoint
);
}
// Wait for responses to come back
for mut inputs in inputs_vec {
let (recvsize, recvfrom) = inputs
.client_sock
.recv_from(&mut inputs.response_buffer)
.with_context(|| "recv_from failed: no response from kapiti")?;
debug!("Bench client got {} bytes from {:?}", recvsize, recvfrom);
if inputs.response_buffer[0] != STUB_REQUEST[0]
|| inputs.response_buffer[1] != STUB_REQUEST[1]
{
bail!(
"Response doesn't have expected request ID:\n- request: {:?}\n-response: {:?}",
&STUB_REQUEST[..],
inputs.response_buffer
);
}
}
Ok(())
}
/// Requests coming in over UDP, upstream endpoint over UDP
fn run_udp_udp_test(
c: &mut Criterion,
name: &str,
upstream_latency: Duration,
samples: usize,
) -> Result<()> {
let tmpstorage = tempfile::tempdir()?;
// Use channels to notify threads to stop
let (stop_upstream_tx, stop_upstream_rx) = oneshot::channel();
let (stop_kapiti_tx, stop_kapiti_rx) = oneshot::channel();
// Start upstream harness
let (upstream_addr, upstream_join_handle) =
start_udp_upstream(upstream_latency, stop_upstream_rx)?;
let config = Config::new_for_test(
tmpstorage
.path()
.to_str()
.expect("invalid temp storage path"),
upstream_addr.to_string(),
);
// Start kapiti server
let mut runtime = Runtime::new()?;
let mut runner = runtime.block_on(Runner::new("benchmark".to_string(), config))?;
runner.set_stop(stop_kapiti_rx);
let kapiti_udp_endpoint = runner.get_udp_endpoint();
let kapiti_join_handle = thread::spawn(move || runtime.block_on(runner.run()));
// Run benchmark: See how quickly we can get responses from kapiti
let run_count: u64 = 30;
let mut group = c.benchmark_group("server");
group.throughput(Throughput::Elements(run_count));
group.sample_size(samples);
group.bench_function(name, |b| {
b.iter_batched(
move || setup_udp_requests(run_count).expect("setup failed"),
move |inputs_vec| {
run_udp_requests(inputs_vec, kapiti_udp_endpoint).expect("client run failed")
},
BatchSize::LargeInput,
)
});
group.finish();
// Send shutdown messages to be consumed by the kapiti thread and the upstream thread (not necessarily in that order)
stop_upstream_tx
.send(())
.expect("unable to notify upstream to stop");
stop_kapiti_tx
.send(())
.expect("unable to notify kapiti to stop");
// Wait for kapiti thread to get the message
debug!("Waiting for kapiti thread to exit...");
kapiti_join_handle
.join()
.expect("failed to join kapiti thread")?;
// Wait for upstream thread to get the message
debug!("Waiting for upstream thread to exit...");
upstream_join_handle
.join()
.expect("failed to join upstream harness thread")?;
Ok(())
}
fn udp_udp_fast(c: &mut Criterion) {
logging::init_logging(); // init once in first bench, then leave alone
run_udp_udp_test(c, "fast", UPSTREAM_LATENCY_FAST, 50).expect("udp_udp test failed");
}
fn udp_udp_slow(c: &mut Criterion) {
run_udp_udp_test(c, "slow", UPSTREAM_LATENCY_SLOW, 10).expect("udp_udp_slow test failed");
}
criterion_group!(benches, udp_udp_fast, udp_udp_slow);
criterion_main!(benches);