~nickbp/originz

ref: d94f181c5fbfdf0f62b40ea46d3f301f4ec681e5 originz/src/runner.rs -rw-r--r-- 10.7 KiB
d94f181cNick Parker Backport current benchmark to older code 1 year, 8 months ago
                                                                                
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
#![deny(warnings)]

use std::fs::create_dir;
use std::net::{SocketAddr, ToSocketAddrs};
use std::path::Path;
use std::time::Duration;

use anyhow::{bail, Context, Result};
use bytes::BytesMut;
use redis::{self, IntoConnectionInfo};
use tokio::net::UdpSocket;
use tokio::sync::oneshot;
use tokio::time;
use tracing::{self, debug, info, trace, warn};

use crate::client::{self, hyper::Resolver};
use crate::filter::filter;
use crate::{config, server};

/// Runs the server. Separate from main.rs to simplify testing in benchmarks
pub struct Runner {
    // TODO(#4) implement TCP server socket as well
    udp_sock: UdpSocket,
    config_path: String,
    config: config::Config,
    stop: Option<oneshot::Receiver<()>>,
}

impl Runner {
    /// Creates a new `Runner` instance after setting up any listen sockets.
    pub async fn new(config_path: String, config: config::Config) -> Result<Runner> {
        // Initialize listen socket up-front so that upstream can quickly downgrade the user to non-root if needed.
        let listen_host = config.listen.trim();
        let listen_addr = listen_host
            .to_socket_addrs()?
            .next()
            .with_context(|| format!("Invalid listen address: {}", listen_host))?;
        let udp_sock = UdpSocket::bind(listen_addr)
            .await
            .with_context(|| format!("Failed to listen on {}", listen_addr))?;

        Ok(Runner {
            udp_sock,
            config_path,
            config,
            stop: None,
        })
    }

    /// Configures the runner with a queue that will tell the runner when to stop processing requests.
    /// For use in tests where we want to cleanly shut down the runner thread.
    pub fn set_stop(self: &mut Runner, stop: oneshot::Receiver<()>) {
        self.stop = Some(stop);
    }

    /// Returns the listen endpoint for the UDP socket.
    /// This is for testing cases, where an ephemeral listen port is being used.
    pub fn get_udp_endpoint(self: &Runner) -> SocketAddr {
        return self.udp_sock.local_addr().expect("Couldn't get local UDP socket address");
    }

    /// Runs the server. This should run until one of the following occurs:
    /// - A fatal error
    /// - Once a configured request limit has been reached
    pub async fn run(self: &mut Runner) -> Result<()> {
        let redis_conn = match self.config.redis.trim() {
            "" => None,
            url => {
                // We avoid logging valid url string since it may contain a password
                let conn_info = url
                    .into_connection_info()
                    .with_context(|| format!("Failed to parse '{}' as Redis URL", url))?;
                info!("Connecting to Redis: {}", conn_info.addr);
                let redis_client = redis::Client::open(conn_info.clone())
                    .with_context(|| format!("Failed to create Redis client for {}", conn_info.addr))?;
                let redis_conn = redis_client
                    .get_connection()
                    .with_context(|| format!("Redis connection failed for {}", conn_info.addr))?;
                Some(redis_conn)
            }
        };

        // TODO(#3) support fallback between list of multiple upstreams someday...
        // TODO(#5) support dns-over-tls as "tls://ip:port"
        // TODO(#6) support dns-over-https as "https://ip:port"
        let query_addr_str = self.config
            .upstreams
            .get(0)
            .expect("missing first upstreams entry")
            .trim();
        // If the configured upstream is a hostname, then just give up and use the system resolver.
        // At this point it's a chicken-and-egg situation and we don't have any upstream to query.
        // In practice the configuration should just use a fixed IP for this, but we allow it for convenience.
        // However, we don't attempt to re-resolve so if the host ever moves then upstream queries will fail.
        let query_addr = query_addr_str
            .to_socket_addrs()
            .with_context(|| format!("Failed to convert upstream to address: {}", query_addr_str))?
            .next()
            .with_context(|| {
                format!(
                    "Expected .upstream=ip:port in {} but was: {}",
                    self.config_path, query_addr_str
                )
            })?;
        info!("Using upstream: {:?}", query_addr);
        // TODO(#3) reuse the same client object used for user queries, and share its udp_size autodetect
        let resolver = Resolver::new(query_addr, false /*ipv6*/, 4096 /*udp_size*/);

        let storage_dir = Path::new(self.config.storage.trim());
        if !storage_dir.exists() {
            create_dir(&storage_dir)
                .with_context(|| format!("Failed to create storage directory: {:?}", storage_dir))?;
        } else if storage_dir.is_file() {
            bail!(
                "Specified .storage path in {} is a regular file: {:?}",
                self.config_path,
                storage_dir
            );
        }

        let download_dir = storage_dir.join("hosts");
        if !download_dir.exists() {
            create_dir(&download_dir).with_context(|| {
                format!(
                    "Failed to create hosts download directory: {:?}",
                    storage_dir
                )
            })?;
        } else if download_dir.is_file() {
            bail!(
                "Hosts download directory within .storage path from {} is a regular file: {:?}",
                self.config_path,
                download_dir
            );
        }

        // TODO(#9)
        // - do initialization in background: don't block (re)start on a filter being slow to update
        // - schedule periodic background update checks for filter files (should only download if URL appears updated)
        // - refrain from killing the process if a download fails or something
        let mut filter = filter::Filter::new(download_dir, resolver)?;
        // Allow these path lists to be unset/empty
        for entry in &self.config.overrides {
            filter.update_override(entry)?;
        }

        {
            let span = tracing::span!(tracing::Level::INFO, "update-filters");
            let _enter = span.enter();
            for entry in &self.config.blocks {
                filter.update_block(entry).await?;
            }
        };

        info!("Waiting for clients at {}", self.udp_sock.local_addr()?);

        let mut packet_buffer = BytesMut::with_capacity(4096);
        // TODO(#12) could have server pool for accepting received requests (use crossbeam queue?)
        //           backpressure could be applied by forwarding a limited number of server_members back and forth
        let mut server_members = server::ServerMembers::new(
            // TODO(#6): Figure out config structure for supporting DoH "bootstrap"
            /*Box::new(client::https::Client::new(
                resolver,
                "https://dns.google/dns-query".to_string(),
                10000,
            )?),*/
            Box::new(client::udp::Client::new(query_addr, 10000)),
            Some(Box::new(client::tcp::Client::new(query_addr, 10000))),
            redis_conn,
        );
        loop {
            // Ensure that the buffer has a SIZE suitable for socket.recv_from().
            // If we just leave it with the CAPACITY then it drops data.
            packet_buffer.resize(packet_buffer.capacity(), 0);

            let request_source: SocketAddr;
            {
                if let Some(stop) = &mut self.stop {
                    // Shutdown support enabled, use a timeout on reads to check periodically for a shutdown signal
                    loop {
                        if let Ok(recvresult) = time::timeout(
                            Duration::from_millis(1000),
                            self.udp_sock.recv_from(&mut packet_buffer),
                        )
                        .await {
                            // Got something before the timeout, but the "something" might be a (non-timeout) error
                            let (recvsize, recvfrom) = recvresult?;
                            // Got a request from somewhere
                            request_source = recvfrom;
                            // Shorten to actual size received (doesnt affect malloc)
                            packet_buffer.truncate(recvsize);
                            break;
                        } else {
                            match stop.try_recv() {
                                Ok(()) => {
                                    // Got a timeout, and it looks like we've been told to stop.
                                    debug!("Stopping runner thread");
                                    return Ok(());
                                }
                                Err(oneshot::error::TryRecvError::Empty) => {
                                    // Not stopping yet
                                }
                                Err(oneshot::error::TryRecvError::Closed) => {
                                    warn!("Upstream stop channel was closed early");
                                }
                            }
                        }
                    }
                } else {
                    // Shutdown support disabled, just wait forever for the next request
                    let (recvsize, recvfrom) = self.udp_sock.recv_from(&mut packet_buffer).await?;
                    // Got a request from somewhere
                    request_source = recvfrom;
                    // Shorten to actual size received (doesnt affect malloc)
                    packet_buffer.truncate(recvsize);
                }
            }

            trace!(
                "Raw request from {:?} ({}b): {:02X?}",
                request_source,
                packet_buffer.len(),
                &packet_buffer[..]
            );

            let span = tracing::span!(tracing::Level::INFO, "udp-query");
            let _enter = span.enter();
            match server::handle_query(&mut server_members, &mut packet_buffer, &filter).await {
                Ok(()) => {
                    // Send the response we got back to the original requestor.
                    // Shouldn't time out but just in case...
                    let _sendsize = time::timeout(
                        Duration::from_millis(1000),
                        self.udp_sock.send_to(&mut packet_buffer, request_source),
                    )
                    .await?;
                }
                Err(ioerr) => {
                    warn!(
                        "Failed to handle request from client={}: {:02X?} ({:?})",
                        request_source,
                        &packet_buffer[..],
                        ioerr
                    );
                }
            }
        }
    }
}