#![deny(warnings)]
use std::fs::create_dir;
use std::net::{SocketAddr, ToSocketAddrs};
use std::path::Path;
use std::time::Duration;
use anyhow::{bail, Context, Result};
use bytes::BytesMut;
use redis::{self, IntoConnectionInfo};
use tokio::net::UdpSocket;
use tokio::sync::oneshot;
use tokio::time;
use tracing::{self, debug, info, trace, warn};
use crate::client::{self, hyper::Resolver};
use crate::filter::filter;
use crate::{config, server};
/// Runs the server. Separate from main.rs to simplify testing in benchmarks
pub struct Runner {
// TODO(#4) implement TCP server socket as well
udp_sock: UdpSocket,
config_path: String,
config: config::Config,
stop: Option<oneshot::Receiver<()>>,
}
impl Runner {
/// Creates a new `Runner` instance after setting up any listen sockets.
pub async fn new(config_path: String, config: config::Config) -> Result<Runner> {
// Initialize listen socket up-front so that upstream can quickly downgrade the user to non-root if needed.
let listen_host = config.listen.trim();
let listen_addr = listen_host
.to_socket_addrs()?
.next()
.with_context(|| format!("Invalid listen address: {}", listen_host))?;
let udp_sock = UdpSocket::bind(listen_addr)
.await
.with_context(|| format!("Failed to listen on {}", listen_addr))?;
Ok(Runner {
udp_sock,
config_path,
config,
stop: None,
})
}
/// Configures the runner with a queue that will tell the runner when to stop processing requests.
/// For use in tests where we want to cleanly shut down the runner thread.
pub fn set_stop(self: &mut Runner, stop: oneshot::Receiver<()>) {
self.stop = Some(stop);
}
/// Returns the listen endpoint for the UDP socket.
/// This is for testing cases, where an ephemeral listen port is being used.
pub fn get_udp_endpoint(self: &Runner) -> SocketAddr {
return self.udp_sock.local_addr().expect("Couldn't get local UDP socket address");
}
/// Runs the server. This should run until one of the following occurs:
/// - A fatal error
/// - Once a configured request limit has been reached
pub async fn run(self: &mut Runner) -> Result<()> {
let redis_conn = match self.config.redis.trim() {
"" => None,
url => {
// We avoid logging valid url string since it may contain a password
let conn_info = url
.into_connection_info()
.with_context(|| format!("Failed to parse '{}' as Redis URL", url))?;
info!("Connecting to Redis: {}", conn_info.addr);
let redis_client = redis::Client::open(conn_info.clone())
.with_context(|| format!("Failed to create Redis client for {}", conn_info.addr))?;
let redis_conn = redis_client
.get_connection()
.with_context(|| format!("Redis connection failed for {}", conn_info.addr))?;
Some(redis_conn)
}
};
// TODO(#3) support fallback between list of multiple upstreams someday...
// TODO(#5) support dns-over-tls as "tls://ip:port"
// TODO(#6) support dns-over-https as "https://ip:port"
let query_addr_str = self.config
.upstreams
.get(0)
.expect("missing first upstreams entry")
.trim();
// If the configured upstream is a hostname, then just give up and use the system resolver.
// At this point it's a chicken-and-egg situation and we don't have any upstream to query.
// In practice the configuration should just use a fixed IP for this, but we allow it for convenience.
// However, we don't attempt to re-resolve so if the host ever moves then upstream queries will fail.
let query_addr = query_addr_str
.to_socket_addrs()
.with_context(|| format!("Failed to convert upstream to address: {}", query_addr_str))?
.next()
.with_context(|| {
format!(
"Expected .upstream=ip:port in {} but was: {}",
self.config_path, query_addr_str
)
})?;
info!("Using upstream: {:?}", query_addr);
// TODO(#3) reuse the same client object used for user queries, and share its udp_size autodetect
let resolver = Resolver::new(query_addr, false /*ipv6*/, 4096 /*udp_size*/);
let storage_dir = Path::new(self.config.storage.trim());
if !storage_dir.exists() {
create_dir(&storage_dir)
.with_context(|| format!("Failed to create storage directory: {:?}", storage_dir))?;
} else if storage_dir.is_file() {
bail!(
"Specified .storage path in {} is a regular file: {:?}",
self.config_path,
storage_dir
);
}
let download_dir = storage_dir.join("hosts");
if !download_dir.exists() {
create_dir(&download_dir).with_context(|| {
format!(
"Failed to create hosts download directory: {:?}",
storage_dir
)
})?;
} else if download_dir.is_file() {
bail!(
"Hosts download directory within .storage path from {} is a regular file: {:?}",
self.config_path,
download_dir
);
}
// TODO(#9)
// - do initialization in background: don't block (re)start on a filter being slow to update
// - schedule periodic background update checks for filter files (should only download if URL appears updated)
// - refrain from killing the process if a download fails or something
let mut filter = filter::Filter::new(download_dir, resolver)?;
// Allow these path lists to be unset/empty
for entry in &self.config.overrides {
filter.update_override(entry)?;
}
{
let span = tracing::span!(tracing::Level::INFO, "update-filters");
let _enter = span.enter();
for entry in &self.config.blocks {
filter.update_block(entry).await?;
}
};
info!("Waiting for clients at {}", self.udp_sock.local_addr()?);
let mut packet_buffer = BytesMut::with_capacity(4096);
// TODO(#12) could have server pool for accepting received requests (use crossbeam queue?)
// backpressure could be applied by forwarding a limited number of server_members back and forth
let mut server_members = server::ServerMembers::new(
// TODO(#6): Figure out config structure for supporting DoH "bootstrap"
/*Box::new(client::https::Client::new(
resolver,
"https://dns.google/dns-query".to_string(),
10000,
)?),*/
Box::new(client::udp::Client::new(query_addr, 10000)),
Some(Box::new(client::tcp::Client::new(query_addr, 10000))),
redis_conn,
);
loop {
// Ensure that the buffer has a SIZE suitable for socket.recv_from().
// If we just leave it with the CAPACITY then it drops data.
packet_buffer.resize(packet_buffer.capacity(), 0);
let request_source: SocketAddr;
{
if let Some(stop) = &mut self.stop {
// Shutdown support enabled, use a timeout on reads to check periodically for a shutdown signal
loop {
if let Ok(recvresult) = time::timeout(
Duration::from_millis(1000),
self.udp_sock.recv_from(&mut packet_buffer),
)
.await {
// Got something before the timeout, but the "something" might be a (non-timeout) error
let (recvsize, recvfrom) = recvresult?;
// Got a request from somewhere
request_source = recvfrom;
// Shorten to actual size received (doesnt affect malloc)
packet_buffer.truncate(recvsize);
break;
} else {
match stop.try_recv() {
Ok(()) => {
// Got a timeout, and it looks like we've been told to stop.
debug!("Stopping runner thread");
return Ok(());
}
Err(oneshot::error::TryRecvError::Empty) => {
// Not stopping yet
}
Err(oneshot::error::TryRecvError::Closed) => {
warn!("Upstream stop channel was closed early");
}
}
}
}
} else {
// Shutdown support disabled, just wait forever for the next request
let (recvsize, recvfrom) = self.udp_sock.recv_from(&mut packet_buffer).await?;
// Got a request from somewhere
request_source = recvfrom;
// Shorten to actual size received (doesnt affect malloc)
packet_buffer.truncate(recvsize);
}
}
trace!(
"Raw request from {:?} ({}b): {:02X?}",
request_source,
packet_buffer.len(),
&packet_buffer[..]
);
let span = tracing::span!(tracing::Level::INFO, "udp-query");
let _enter = span.enter();
match server::handle_query(&mut server_members, &mut packet_buffer, &filter).await {
Ok(()) => {
// Send the response we got back to the original requestor.
// Shouldn't time out but just in case...
let _sendsize = time::timeout(
Duration::from_millis(1000),
self.udp_sock.send_to(&mut packet_buffer, request_source),
)
.await?;
}
Err(ioerr) => {
warn!(
"Failed to handle request from client={}: {:02X?} ({:?})",
request_source,
&packet_buffer[..],
ioerr
);
}
}
}
}
}