~nickbp/originz

fbaed2a25114cf06aaa5d509c0d19d28ac4faa6d — Nick Parker 2 years ago 5a61a75
Implement benchmark test for UDP client/UDP upstream (#10)

Seeing around 12 kqps if these numbers are right!
8 files changed, 585 insertions(+), 37 deletions(-)

M Cargo.lock
M Cargo.toml
A benches/server.rs
M src/codec/encoder.rs
M src/config.rs
M src/main.rs
M src/runner.rs
M src/server.rs
M Cargo.lock => Cargo.lock +248 -0
@@ 37,6 37,16 @@ dependencies = [
]

[[package]]
name = "atty"
version = "0.2.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
 "hermit-abi 0.1.14 (registry+https://github.com/rust-lang/crates.io-index)",
 "libc 0.2.71 (registry+https://github.com/rust-lang/crates.io-index)",
 "winapi 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)",
]

[[package]]
name = "autocfg"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"


@@ 118,6 128,14 @@ dependencies = [
]

[[package]]
name = "cast"
version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
 "rustc_version 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)",
]

[[package]]
name = "cc"
version = "1.0.54"
source = "registry+https://github.com/rust-lang/crates.io-index"


@@ 141,6 159,16 @@ dependencies = [
]

[[package]]
name = "clap"
version = "2.33.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
 "bitflags 1.2.1 (registry+https://github.com/rust-lang/crates.io-index)",
 "textwrap 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)",
 "unicode-width 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)",
]

[[package]]
name = "combine"
version = "4.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"


@@ 172,6 200,83 @@ dependencies = [
]

[[package]]
name = "criterion"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
 "atty 0.2.14 (registry+https://github.com/rust-lang/crates.io-index)",
 "cast 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)",
 "clap 2.33.1 (registry+https://github.com/rust-lang/crates.io-index)",
 "criterion-plot 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)",
 "csv 1.1.3 (registry+https://github.com/rust-lang/crates.io-index)",
 "itertools 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)",
 "lazy_static 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)",
 "num-traits 0.2.12 (registry+https://github.com/rust-lang/crates.io-index)",
 "oorandom 11.1.2 (registry+https://github.com/rust-lang/crates.io-index)",
 "plotters 0.2.15 (registry+https://github.com/rust-lang/crates.io-index)",
 "rayon 1.3.1 (registry+https://github.com/rust-lang/crates.io-index)",
 "regex 1.3.9 (registry+https://github.com/rust-lang/crates.io-index)",
 "serde 1.0.114 (registry+https://github.com/rust-lang/crates.io-index)",
 "serde_derive 1.0.114 (registry+https://github.com/rust-lang/crates.io-index)",
 "serde_json 1.0.55 (registry+https://github.com/rust-lang/crates.io-index)",
 "tinytemplate 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
 "walkdir 2.3.1 (registry+https://github.com/rust-lang/crates.io-index)",
]

[[package]]
name = "criterion-plot"
version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
 "cast 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)",
 "itertools 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)",
]

[[package]]
name = "crossbeam-deque"
version = "0.7.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
 "crossbeam-epoch 0.8.2 (registry+https://github.com/rust-lang/crates.io-index)",
 "crossbeam-utils 0.7.2 (registry+https://github.com/rust-lang/crates.io-index)",
 "maybe-uninit 2.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
]

[[package]]
name = "crossbeam-epoch"
version = "0.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
 "autocfg 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
 "cfg-if 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)",
 "crossbeam-utils 0.7.2 (registry+https://github.com/rust-lang/crates.io-index)",
 "lazy_static 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)",
 "maybe-uninit 2.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
 "memoffset 0.5.4 (registry+https://github.com/rust-lang/crates.io-index)",
 "scopeguard 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
]

[[package]]
name = "crossbeam-queue"
version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
 "cfg-if 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)",
 "crossbeam-utils 0.7.2 (registry+https://github.com/rust-lang/crates.io-index)",
 "maybe-uninit 2.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
]

[[package]]
name = "crossbeam-utils"
version = "0.7.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
 "autocfg 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
 "cfg-if 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)",
 "lazy_static 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)",
]

[[package]]
name = "csv"
version = "1.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"


@@ 429,6 534,14 @@ dependencies = [
]

[[package]]
name = "hermit-abi"
version = "0.1.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
 "libc 0.2.71 (registry+https://github.com/rust-lang/crates.io-index)",
]

[[package]]
name = "http"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"


@@ 555,6 668,8 @@ dependencies = [
 "async-trait 0.1.36 (registry+https://github.com/rust-lang/crates.io-index)",
 "bytes 0.5.5 (registry+https://github.com/rust-lang/crates.io-index)",
 "chrono 0.4.11 (registry+https://github.com/rust-lang/crates.io-index)",
 "criterion 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)",
 "crossbeam-queue 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)",
 "csv 1.1.3 (registry+https://github.com/rust-lang/crates.io-index)",
 "ct-logs 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)",
 "flatbuffers 0.6.1 (registry+https://github.com/rust-lang/crates.io-index)",


@@ 577,6 692,7 @@ dependencies = [
 "scopeguard 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
 "serde 1.0.114 (registry+https://github.com/rust-lang/crates.io-index)",
 "sha2 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)",
 "tempfile 3.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
 "tokio 0.2.21 (registry+https://github.com/rust-lang/crates.io-index)",
 "toml 0.5.6 (registry+https://github.com/rust-lang/crates.io-index)",
 "tower-service 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)",


@@ 638,11 754,24 @@ version = "0.1.8"
source = "registry+https://github.com/rust-lang/crates.io-index"

[[package]]
name = "maybe-uninit"
version = "2.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"

[[package]]
name = "memchr"
version = "2.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"

[[package]]
name = "memoffset"
version = "0.5.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
 "autocfg 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
]

[[package]]
name = "miniz_oxide"
version = "0.3.7"
source = "registry+https://github.com/rust-lang/crates.io-index"


@@ 719,11 848,25 @@ dependencies = [
]

[[package]]
name = "num_cpus"
version = "1.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
 "hermit-abi 0.1.14 (registry+https://github.com/rust-lang/crates.io-index)",
 "libc 0.2.71 (registry+https://github.com/rust-lang/crates.io-index)",
]

[[package]]
name = "once_cell"
version = "1.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"

[[package]]
name = "oorandom"
version = "11.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"

[[package]]
name = "opaque-debug"
version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"


@@ 786,6 929,17 @@ version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"

[[package]]
name = "plotters"
version = "0.2.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
 "js-sys 0.3.40 (registry+https://github.com/rust-lang/crates.io-index)",
 "num-traits 0.2.12 (registry+https://github.com/rust-lang/crates.io-index)",
 "wasm-bindgen 0.2.63 (registry+https://github.com/rust-lang/crates.io-index)",
 "web-sys 0.3.40 (registry+https://github.com/rust-lang/crates.io-index)",
]

[[package]]
name = "ppv-lite86"
version = "0.2.8"
source = "registry+https://github.com/rust-lang/crates.io-index"


@@ 891,6 1045,29 @@ dependencies = [
]

[[package]]
name = "rayon"
version = "1.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
 "autocfg 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
 "crossbeam-deque 0.7.3 (registry+https://github.com/rust-lang/crates.io-index)",
 "either 1.5.3 (registry+https://github.com/rust-lang/crates.io-index)",
 "rayon-core 1.7.1 (registry+https://github.com/rust-lang/crates.io-index)",
]

[[package]]
name = "rayon-core"
version = "1.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
 "crossbeam-deque 0.7.3 (registry+https://github.com/rust-lang/crates.io-index)",
 "crossbeam-queue 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)",
 "crossbeam-utils 0.7.2 (registry+https://github.com/rust-lang/crates.io-index)",
 "lazy_static 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)",
 "num_cpus 1.13.0 (registry+https://github.com/rust-lang/crates.io-index)",
]

[[package]]
name = "redis"
version = "0.16.0"
source = "registry+https://github.com/rust-lang/crates.io-index"


@@ 1004,6 1181,14 @@ version = "1.0.5"
source = "registry+https://github.com/rust-lang/crates.io-index"

[[package]]
name = "same-file"
version = "1.0.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
 "winapi-util 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)",
]

[[package]]
name = "schannel"
version = "0.1.19"
source = "registry+https://github.com/rust-lang/crates.io-index"


@@ 1185,6 1370,14 @@ dependencies = [
]

[[package]]
name = "textwrap"
version = "0.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
 "unicode-width 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)",
]

[[package]]
name = "thread_local"
version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"


@@ 1202,6 1395,15 @@ dependencies = [
]

[[package]]
name = "tinytemplate"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
 "serde 1.0.114 (registry+https://github.com/rust-lang/crates.io-index)",
 "serde_json 1.0.55 (registry+https://github.com/rust-lang/crates.io-index)",
]

[[package]]
name = "tinyvec"
version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"


@@ 1372,6 1574,11 @@ dependencies = [
]

[[package]]
name = "unicode-width"
version = "0.1.7"
source = "registry+https://github.com/rust-lang/crates.io-index"

[[package]]
name = "unicode-xid"
version = "0.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"


@@ 1415,6 1622,16 @@ dependencies = [
]

[[package]]
name = "walkdir"
version = "2.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
 "same-file 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)",
 "winapi 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)",
 "winapi-util 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)",
]

[[package]]
name = "want"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"


@@ 1520,6 1737,14 @@ version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"

[[package]]
name = "winapi-util"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
 "winapi 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)",
]

[[package]]
name = "winapi-x86_64-pc-windows-gnu"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"


@@ 1567,6 1792,7 @@ dependencies = [
"checksum ansi_term 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ee49baf6cb617b853aa8d93bf420db2383fab46d314482ca2803b40d5fde979b"
"checksum anyhow 1.0.31 (registry+https://github.com/rust-lang/crates.io-index)" = "85bb70cc08ec97ca5450e6eba421deeea5f172c0fc61f78b5357b2a8e8be195f"
"checksum async-trait 0.1.36 (registry+https://github.com/rust-lang/crates.io-index)" = "a265e3abeffdce30b2e26b7a11b222fe37c6067404001b434101457d0385eb92"
"checksum atty 0.2.14 (registry+https://github.com/rust-lang/crates.io-index)" = "d9b39be18770d11421cdb1b9947a45dd3f37e93092cbf377614828a319d5fee8"
"checksum autocfg 1.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "f8aac770f1885fd7e387acedd76065302551364496e46b3dd00860b2f8359b9d"
"checksum base64 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)" = "b41b7ea54a0c9d92199de89e20e58d49f02f8e699814ef3fdf266f6f748d15c7"
"checksum bit-set 0.5.2 (registry+https://github.com/rust-lang/crates.io-index)" = "6e11e16035ea35e4e5997b393eacbf6f63983188f7a2ad25bfb13465f5ad59de"


@@ 1579,13 1805,21 @@ dependencies = [
"checksum byte-tools 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "e3b5ca7a04898ad4bcd41c90c5285445ff5b791899bb1b0abdd2a2aa791211d7"
"checksum byteorder 1.3.4 (registry+https://github.com/rust-lang/crates.io-index)" = "08c48aae112d48ed9f069b33538ea9e3e90aa263cfa3d1c24309612b1f7472de"
"checksum bytes 0.5.5 (registry+https://github.com/rust-lang/crates.io-index)" = "118cf036fbb97d0816e3c34b2d7a1e8cfc60f68fcf63d550ddbe9bd5f59c213b"
"checksum cast 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)" = "4b9434b9a5aa1450faa3f9cb14ea0e8c53bb5d2b3c1bfd1ab4fc03e9f33fbfb0"
"checksum cc 1.0.54 (registry+https://github.com/rust-lang/crates.io-index)" = "7bbb73db36c1246e9034e307d0fba23f9a2e251faa47ade70c1bd252220c8311"
"checksum cfg-if 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)" = "4785bdd1c96b2a846b2bd7cc02e86b6b3dbf14e7e53446c4f54c92a361040822"
"checksum chrono 0.4.11 (registry+https://github.com/rust-lang/crates.io-index)" = "80094f509cf8b5ae86a4966a39b3ff66cd7e2a3e594accec3743ff3fabeab5b2"
"checksum clap 2.33.1 (registry+https://github.com/rust-lang/crates.io-index)" = "bdfa80d47f954d53a35a64987ca1422f495b8d6483c0fe9f7117b36c2a792129"
"checksum combine 4.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "b8e5ef862b2df927249f4e2bdc29c1bd13a33105f900884b0c32acdf32aff584"
"checksum core-foundation 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)" = "57d24c7a13c43e870e37c1556b74555437870a04514f7685f5b354e090567171"
"checksum core-foundation-sys 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)" = "b3a71ab494c0b5b860bdc8407ae08978052417070c2ced38573a9157ad75b8ac"
"checksum crc32fast 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ba125de2af0df55319f41944744ad91c71113bf74a4646efff39afe1f6842db1"
"checksum criterion 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)" = "63f696897c88b57f4ffe3c69d8e1a0613c7d0e6c4833363c8560fbde9c47b966"
"checksum criterion-plot 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)" = "ddeaf7989f00f2e1d871a26a110f3ed713632feac17f65f03ca938c542618b60"
"checksum crossbeam-deque 0.7.3 (registry+https://github.com/rust-lang/crates.io-index)" = "9f02af974daeee82218205558e51ec8768b48cf524bd01d550abe5573a608285"
"checksum crossbeam-epoch 0.8.2 (registry+https://github.com/rust-lang/crates.io-index)" = "058ed274caafc1f60c4997b5fc07bf7dc7cca454af7c6e81edffe5f33f70dace"
"checksum crossbeam-queue 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)" = "774ba60a54c213d409d5353bda12d49cd68d14e45036a285234c8d6f91f92570"
"checksum crossbeam-utils 0.7.2 (registry+https://github.com/rust-lang/crates.io-index)" = "c3c7c73a2d1e9fc0886a08b93e98eb643461230d5f1925e4036204d5f2e261a8"
"checksum csv 1.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "00affe7f6ab566df61b4be3ce8cf16bc2576bca0963ceb0955e45d514bf9a279"
"checksum csv-core 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)" = "2b2466559f260f48ad25fe6317b3c8dac77b5bdb5763ac7d9d6103530663bc90"
"checksum ct-logs 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)" = "4d3686f5fa27dbc1d76c751300376e167c5a43387f44bb451fd1c24776e49113"


@@ 1615,6 1849,7 @@ dependencies = [
"checksum git-testament-derive 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)" = "526ad4c79ca35ec046176e89787409f1d75d9cd51fa9258c01ca206d4fba9340"
"checksum glob 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "9b919933a397b79c37e33b77bb2aa3dc8eb6e165ad809e58ff75bc7db2e34574"
"checksum h2 0.2.5 (registry+https://github.com/rust-lang/crates.io-index)" = "79b7246d7e4b979c03fa093da39cfb3617a96bbeee6310af63991668d7e843ff"
"checksum hermit-abi 0.1.14 (registry+https://github.com/rust-lang/crates.io-index)" = "b9586eedd4ce6b3c498bc3b4dd92fc9f11166aa908a914071953768066c67909"
"checksum http 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "28d569972648b2c512421b5f2a405ad6ac9666547189d0c5477a3f200f3e02f9"
"checksum http-body 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "13d5ff830006f7646652e057693569bfe0d51760c0085a071769d142a205111b"
"checksum httparse 1.3.4 (registry+https://github.com/rust-lang/crates.io-index)" = "cd179ae861f0c2e53da70d892f5f3029f9594be0c41dc5269cd371691b1dc2f9"


@@ 1634,7 1869,9 @@ dependencies = [
"checksum loom 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)" = "4ecc775857611e1df29abba5c41355cdf540e7e9d4acfdf0f355eefee82330b7"
"checksum matchers 0.0.1 (registry+https://github.com/rust-lang/crates.io-index)" = "f099785f7595cc4b4553a174ce30dd7589ef93391ff414dbb67f62392b9e0ce1"
"checksum matches 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)" = "7ffc5c5338469d4d3ea17d269fa8ea3512ad247247c30bd2df69e68309ed0a08"
"checksum maybe-uninit 2.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "60302e4db3a61da70c0cb7991976248362f30319e88850c487b9b95bbf059e00"
"checksum memchr 2.3.3 (registry+https://github.com/rust-lang/crates.io-index)" = "3728d817d99e5ac407411fa471ff9800a778d88a24685968b36824eaf4bee400"
"checksum memoffset 0.5.4 (registry+https://github.com/rust-lang/crates.io-index)" = "b4fc2c02a7e374099d4ee95a193111f72d2110197fe200272371758f6c3643d8"
"checksum miniz_oxide 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)" = "791daaae1ed6889560f8c4359194f56648355540573244a5448a83ba1ecc7435"
"checksum mio 0.6.22 (registry+https://github.com/rust-lang/crates.io-index)" = "fce347092656428bc8eaf6201042cb551b8d67855af7374542a92a0fbfcac430"
"checksum miow 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "8c1f2f3b1cf331de6896aabf6e9d55dca90356cc9960cca7eaaf408a355ae919"


@@ 1642,7 1879,9 @@ dependencies = [
"checksum nix 0.17.0 (registry+https://github.com/rust-lang/crates.io-index)" = "50e4785f2c3b7589a0d0c1dd60285e1188adac4006e8abd6dd578e1567027363"
"checksum num-integer 0.1.43 (registry+https://github.com/rust-lang/crates.io-index)" = "8d59457e662d541ba17869cf51cf177c0b5f0cbf476c66bdc90bf1edac4f875b"
"checksum num-traits 0.2.12 (registry+https://github.com/rust-lang/crates.io-index)" = "ac267bcc07f48ee5f8935ab0d24f316fb722d7a1292e2913f0cc196b29ffd611"
"checksum num_cpus 1.13.0 (registry+https://github.com/rust-lang/crates.io-index)" = "05499f3756671c15885fee9034446956fff3f243d6077b91e5767df161f766b3"
"checksum once_cell 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "0b631f7e854af39a1739f401cf34a8a013dfe09eac4fa4dba91e9768bd28168d"
"checksum oorandom 11.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "a170cebd8021a008ea92e4db85a72f80b35df514ec664b296fdcbb654eac0b2c"
"checksum opaque-debug 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)" = "2839e79665f131bdb5782e51f2c6c9599c133c6098982a54c794358bf432529c"
"checksum openssl-probe 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "77af24da69f9d9341038eba93a073b1fdaaa1b788221b00a69bce9e762cb32de"
"checksum packed_struct 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "90caf80e74380d94f2aabc83edb900b49123b3132442fb147f9155c87a756281"


@@ 1652,6 1891,7 @@ dependencies = [
"checksum pin-project-internal 0.4.22 (registry+https://github.com/rust-lang/crates.io-index)" = "6a0ffd45cf79d88737d7cc85bfd5d2894bee1139b356e616fe85dc389c61aaf7"
"checksum pin-project-lite 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)" = "282adbf10f2698a7a77f8e983a74b2d18176c19a7fd32a45446139ae7b02b715"
"checksum pin-utils 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
"checksum plotters 0.2.15 (registry+https://github.com/rust-lang/crates.io-index)" = "0d1685fbe7beba33de0330629da9d955ac75bd54f33d7b79f9a895590124f6bb"
"checksum ppv-lite86 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)" = "237a5ed80e274dbc66f86bd59c1e25edc039660be53194b5fe0a482e0f2612ea"
"checksum proc-macro-hack 0.5.16 (registry+https://github.com/rust-lang/crates.io-index)" = "7e0456befd48169b9f13ef0f0ad46d492cf9d2dbb918bcf38e01eed4ce3ec5e4"
"checksum proc-macro-nested 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)" = "eba180dafb9038b050a4c280019bbedf9f2467b61e5d892dcad585bb57aadc5a"


@@ 1665,6 1905,8 @@ dependencies = [
"checksum rand_core 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)" = "90bde5296fc891b0cef12a6d03ddccc162ce7b2aff54160af9338f8d40df6d19"
"checksum rand_hc 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ca3129af7b92a17112d59ad498c6f81eaf463253766b90396d39ea7a39d6613c"
"checksum rand_xorshift 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "77d416b86801d23dde1aa643023b775c3a462efc0ed96443add11546cdf1dca8"
"checksum rayon 1.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "62f02856753d04e03e26929f820d0a0a337ebe71f849801eea335d464b349080"
"checksum rayon-core 1.7.1 (registry+https://github.com/rust-lang/crates.io-index)" = "e92e15d89083484e11353891f1af602cc661426deb9564c298b270c726973280"
"checksum redis 0.16.0 (registry+https://github.com/rust-lang/crates.io-index)" = "7b94c6247d45d78d24481a5b7aca146f414ec0f5e39e175f294d1876b943eeeb"
"checksum redox_syscall 0.1.56 (registry+https://github.com/rust-lang/crates.io-index)" = "2439c63f3f6139d1b57529d16bc3b8bb855230c8efcc5d3a896c8bea7c3b1e84"
"checksum regex 1.3.9 (registry+https://github.com/rust-lang/crates.io-index)" = "9c3780fcf44b193bc4d09f36d2a3c87b251da4a046c87795a0d35f4f927ad8e6"


@@ 1677,6 1919,7 @@ dependencies = [
"checksum rustls-native-certs 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "a75ffeb84a6bd9d014713119542ce415db3a3e4748f0bfce1e1416cd224a23a5"
"checksum rusty-fork 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "cb3dcc6e454c328bb824492db107ab7c0ae8fcffe4ad210136ef014458c1bc4f"
"checksum ryu 1.0.5 (registry+https://github.com/rust-lang/crates.io-index)" = "71d301d4193d031abdd79ff7e3dd721168a9572ef3fe51a1517aba235bd8f86e"
"checksum same-file 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)" = "93fc1dc3aaa9bfed95e02e6eadabb4baf7e3078b0bd1b4d7b6b0b68378900502"
"checksum schannel 0.1.19 (registry+https://github.com/rust-lang/crates.io-index)" = "8f05ba609c234e60bee0d547fe94a4c7e9da733d1c962cf6e59efa4cd9c8bc75"
"checksum scoped-tls 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "332ffa32bf586782a3efaeb58f127980944bbc8c4d6913a86107ac2a5ab24b28"
"checksum scopeguard 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd"


@@ 1699,8 1942,10 @@ dependencies = [
"checksum syn 1.0.33 (registry+https://github.com/rust-lang/crates.io-index)" = "e8d5d96e8cbb005d6959f119f773bfaebb5684296108fb32600c00cde305b2cd"
"checksum synom 0.11.3 (registry+https://github.com/rust-lang/crates.io-index)" = "a393066ed9010ebaed60b9eafa373d4b1baac186dd7e008555b0f702b51945b6"
"checksum tempfile 3.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "7a6e24d9338a0a5be79593e2fa15a648add6138caa803e2d5bc782c371732ca9"
"checksum textwrap 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)" = "d326610f408c7a4eb6f51c37c330e496b08506c9457c9d34287ecc38809fb060"
"checksum thread_local 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)" = "d40c6d1b69745a6ec6fb1ca717914848da4b44ae29d9b3080cbee91d72a69b14"
"checksum time 0.1.43 (registry+https://github.com/rust-lang/crates.io-index)" = "ca8a50ef2360fbd1eeb0ecd46795a87a19024eb4b53c5dc916ca1fd95fe62438"
"checksum tinytemplate 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "6d3dc76004a03cec1c5932bca4cdc2e39aaa798e3f82363dd94f9adf6098c12f"
"checksum tinyvec 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)" = "53953d2d3a5ad81d9f844a32f14ebb121f50b650cd59d0ee2a07cf13c617efed"
"checksum tokio 0.2.21 (registry+https://github.com/rust-lang/crates.io-index)" = "d099fa27b9702bed751524694adbe393e18b36b204da91eb1cbbbbb4a5ee2d58"
"checksum tokio-macros 0.2.5 (registry+https://github.com/rust-lang/crates.io-index)" = "f0c3acc6aa564495a0f2e1d59fab677cd7f81a19994cfc7f3ad0e64301560389"


@@ 1719,6 1964,7 @@ dependencies = [
"checksum typenum 1.12.0 (registry+https://github.com/rust-lang/crates.io-index)" = "373c8a200f9e67a0c95e62a4f52fbf80c23b4381c05a17845531982fa99e6b33"
"checksum unicode-bidi 0.3.4 (registry+https://github.com/rust-lang/crates.io-index)" = "49f2bd0c6468a8230e1db229cff8029217cf623c767ea5d60bfbd42729ea54d5"
"checksum unicode-normalization 0.1.13 (registry+https://github.com/rust-lang/crates.io-index)" = "6fb19cf769fa8c6a80a162df694621ebeb4dafb606470b2b2fce0be40a98a977"
"checksum unicode-width 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)" = "caaa9d531767d1ff2150b9332433f32a24622147e5ebb1f26409d5da67afd479"
"checksum unicode-xid 0.0.4 (registry+https://github.com/rust-lang/crates.io-index)" = "8c1f860d7d29cf02cb2f3f359fd35991af3d30bac52c57d265a3c461074cb4dc"
"checksum unicode-xid 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "f7fe0bb3479651439c9112f72b6c505038574c9fbb575ed1bf3b797fa39dd564"
"checksum untrusted 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)" = "a156c684c91ea7d62626509bce3cb4e1d9ed5c4d978f7b4352658f96a4c26b4a"


@@ 1726,6 1972,7 @@ dependencies = [
"checksum version_check 0.9.2 (registry+https://github.com/rust-lang/crates.io-index)" = "b5a972e5669d67ba988ce3dc826706fb0a8b01471c088cb0b6110b805cc36aed"
"checksum void 1.0.2 (registry+https://github.com/rust-lang/crates.io-index)" = "6a02e4885ed3bc0f2de90ea6dd45ebcbb66dacffe03547fadbb0eeae2770887d"
"checksum wait-timeout 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "9f200f5b12eb75f8c1ed65abd4b2db8a6e1b138a20de009dacee265a2498f3f6"
"checksum walkdir 2.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "777182bc735b6424e1a57516d35ed72cb8019d85c8c9bf536dccb3445c1a2f7d"
"checksum want 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "1ce8a968cb1cd110d136ff8b819a556d6fb6d919363c61534f6860c7eb172ba0"
"checksum wasi 0.9.0+wasi-snapshot-preview1 (registry+https://github.com/rust-lang/crates.io-index)" = "cccddf32554fecc6acb585f82a32a72e28b48f8c4c1883ddfeeeaa96f7d8e519"
"checksum wasm-bindgen 0.2.63 (registry+https://github.com/rust-lang/crates.io-index)" = "4c2dc4aa152834bc334f506c1a06b866416a8b6697d5c9f75b9a689c8486def0"


@@ 1739,6 1986,7 @@ dependencies = [
"checksum winapi 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)" = "8093091eeb260906a183e6ae1abdba2ef5ef2257a21801128899c3fc699229c6"
"checksum winapi-build 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "2d315eee3b34aca4797b2da6b13ed88266e6d612562a0c46390af8299fc699bc"
"checksum winapi-i686-pc-windows-gnu 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6"
"checksum winapi-util 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "70ec6ce85bb158151cae5e5c87f95a8e97d2c0c4b001223f33a334e3ce5de178"
"checksum winapi-x86_64-pc-windows-gnu 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f"
"checksum ws2_32-sys 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "d59cefebd0c892fa2dd6de581e937301d8552cb44489cdff035c6187cb63fa5e"
"checksum zstd 0.5.3+zstd.1.4.5 (registry+https://github.com/rust-lang/crates.io-index)" = "01b32eaf771efa709e8308605bbf9319bf485dc1503179ec0469b611937c0cd8"

M Cargo.toml => Cargo.toml +8 -1
@@ 16,6 16,7 @@ async-trait = "0.1"
bytes = "0.5"
ct-logs = "0.7"
chrono = "0.4"
crossbeam-queue = "0.2"
flatbuffers = "0.6"
flate2 = "1.0"
futures = "0.3"


@@ 46,5 47,11 @@ tracing-subscriber = "0.2"
zstd = "0.5"

[dev-dependencies]
criterion = "0.3" # for benchmarks
csv = "1.1" # for examples/update_fbs
proptest = "0.10"
proptest = "0.10" # for property tests
tempfile = "3" # for benchmarks

[[bench]]
name = "server"
harness = false

A benches/server.rs => benches/server.rs +243 -0
@@ 0,0 1,243 @@
#![deny(warnings, rust_2018_idioms)]

use std::net::{IpAddr, Ipv4Addr, SocketAddr, ToSocketAddrs, UdpSocket};
use std::sync::Arc;
use std::thread::{self, JoinHandle};
use std::time::Duration;

use anyhow::{bail, Context, Result};
use bytes::BytesMut;
use criterion::{criterion_group, criterion_main, BatchSize, Criterion, Throughput};
use crossbeam_queue::ArrayQueue;
use lazy_static::lazy_static;
use packed_struct::prelude::*;
use tempfile;
use tokio::runtime::Runtime;
use tracing::debug;

use kapiti::codec::{domain_name, encoder::DNSMessageEncoder, message};
use kapiti::config::Config;
use kapiti::fbs::dns_enums_generated::{ResourceClass, ResourceType, ResponseCode};
use kapiti::fbs::dns_message_generated::{Question, QuestionArgs};
use kapiti::logging;
use kapiti::runner::Runner;

const LOCAL_EPHEMERAL_ENDPOINT: &str = "127.0.0.1:0";

const STUB_QUERY_IP: IpAddr = IpAddr::V4(Ipv4Addr::new(1, 2, 4, 8));
const STUB_QUERY_NAME: &str = "kapiti.io.";

lazy_static! {
    static ref STUB_REQUEST: BytesMut = write_stub_request().expect("Failed to create stub request");
}

fn build_stub_question<'a>() -> flatbuffers::FlatBufferBuilder<'a> {
    let mut fbb = flatbuffers::FlatBufferBuilder::new_with_capacity(1024);
    let question_args = QuestionArgs {
        name: Some(fbb.create_string(STUB_QUERY_NAME)),
        resource_type: ResourceType::TYPE_A as u16,
        resource_class: ResourceClass::CLASS_INTERNET as u16,
    };
    let question_offset = Question::create(&mut fbb, &question_args);
    fbb.finish_minimal(question_offset);
    fbb
}

/// Writes a DNS response/answer into the provided buffer
fn write_stub_response() -> Result<BytesMut> {
    let question_fbb = build_stub_question();
    let question: Question<'_> = flatbuffers::get_root::<Question<'_>>(question_fbb.finished_data());

    let mut buf = BytesMut::with_capacity(4096);
    DNSMessageEncoder::new().encode_local_response(
        ResponseCode::RESPONSE_NOERROR,
        0,
        &question,
        None,
        Some(STUB_QUERY_IP),
        None,
        &mut buf
    )?;

    Ok(buf)
}

/// Writes a DNS request/question into the provided buffer
fn write_stub_request() -> Result<BytesMut> {
    let mut buf = BytesMut::with_capacity(4096);
    message::write_header_bits(
        message::HeaderBits {
            id: 12345,

            is_response: true,
            op_code: Integer::from(0 /*QUERY*/),
            authoritative: false,
            truncated: false,
            recursion_desired: true,
            recursion_available: true,
            reserved_9: false,
            authentic_data: false,
            checking_disabled: false,
            response_code: Integer::from(ResponseCode::RESPONSE_NOERROR as u8),

            question_count: 1,
            answer_count: 0,
            authority_count: 0,
            additional_count: 0,
        },
        &mut buf,
    )?;

    let question_fbb = build_stub_question();
    let question: Question<'_> = flatbuffers::get_root::<Question<'_>>(question_fbb.finished_data());
    let mut ptr_offsets = domain_name::LabelOffsets::new();
    message::write_question(&question, &mut buf, &mut ptr_offsets)?;

    Ok(buf)
}

fn run_udp_upstream(udp_sock: UdpSocket, stop: Arc<ArrayQueue<()>>) -> Result<()> {
    let mut request_buffer = BytesMut::with_capacity(4096);
    let mut response_buffer = write_stub_response()?;

    loop {
        // Ensure that the buffer has a SIZE suitable for socket.recv_from().
        // If we just leave it with the CAPACITY then it drops data.
        request_buffer.resize(request_buffer.capacity(), 0);
        debug!("Harness reading...");
        match udp_sock.recv_from(&mut request_buffer) {
            Ok((recvsize, recvfrom)) => {
                debug!("Harness got {} bytes from {:?}", recvsize, recvfrom);
                // Ensure that the response has a matching request ID (first two bytes)
                if recvsize < 2 {
                    bail!("Expected request to have at least 2 bytes, but got {}", recvsize);
                }
                response_buffer[0] = request_buffer[0];
                response_buffer[1] = request_buffer[1];

                request_buffer.clear();

                debug!("Harness replying...");
                let sendsize = udp_sock.send_to(&mut response_buffer, recvfrom)?;
                debug!("Harness sent {} bytes to {:?}", sendsize, recvfrom);
            },
            Err(_e) => {
                // Might be a timeout where upstream is waiting for us to exit, or might not be
                if stop.pop().is_ok() {
                    debug!("Stopping harness thread");
                    return Ok(());
                }
            }
        }
    }
}

fn start_udp_upstream(stop: Arc<ArrayQueue<()>>) -> Result<(SocketAddr, JoinHandle<Result<()>>)> {
    let listen_addr_ephemeral = LOCAL_EPHEMERAL_ENDPOINT
        .to_socket_addrs()?
        .next()
        .with_context(|| "Invalid listen address")?;
    let upstream_sock = UdpSocket::bind(listen_addr_ephemeral)
        .with_context(|| format!("Failed to listen on {}", listen_addr_ephemeral))?;
    upstream_sock.set_read_timeout(Some(Duration::from_millis(1000)))?;
    upstream_sock.set_write_timeout(Some(Duration::from_millis(1000)))?;
    let listen_addr_actual = upstream_sock.local_addr()?;
    debug!("Harness running at {:?}", listen_addr_actual);

    Ok((listen_addr_actual, thread::spawn(move || {
        run_udp_upstream(upstream_sock, stop)
            .with_context(|| format!("run_udp_upstream failed"))
    })))
}

struct RunInputs {
    client_sock: UdpSocket,
    response_buffer: BytesMut,
}

fn setup_udp_requests() -> Result<RunInputs> {
    let client_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127,0,0,1)), 0);
    let client_sock = UdpSocket::bind(client_addr)?;
    client_sock.set_read_timeout(Some(Duration::from_millis(5000)))?;
    client_sock.set_write_timeout(Some(Duration::from_millis(5000)))?;
    let response_buffer = BytesMut::with_capacity(4096);
    Ok(RunInputs {
        client_sock,
        response_buffer,
    })
}

/// Sets up and runs `count` requests, waiting for a response after each request.
/// Reinitializes every time so that `move` will work.
fn run_udp_requests(mut inputs: RunInputs, kapiti_udp_endpoint: SocketAddr, count: u64) -> Result<()> {
    for _i in 0..count {
        debug!("Harness sending...");
        let sendsize = inputs.client_sock.send_to(&STUB_REQUEST[..], kapiti_udp_endpoint).with_context(|| "send_to failed")?;
        debug!("Harness sent {} bytes to {:?}", sendsize, kapiti_udp_endpoint);
        // Ensure that the buffer has a SIZE suitable for socket.recv_from().
        // If we just leave it with the CAPACITY then it drops data.
        inputs.response_buffer.resize(inputs.response_buffer.capacity(), 0);
        debug!("Client reading...");
        let (recvsize, recvfrom) = inputs.client_sock.recv_from(&mut inputs.response_buffer).with_context(|| "recv_from failed: no response from kapiti")?;
        debug!("Client got {} bytes from {:?}", recvsize, recvfrom);
        if inputs.response_buffer[0] != STUB_REQUEST[0] || inputs.response_buffer[1] != STUB_REQUEST[1] {
            bail!("Response doesn't have expected request ID:\n- request: {:?}\n-response: {:?}", &STUB_REQUEST[..], inputs.response_buffer);
        }
    }
    Ok(())
}

/// Requests coming in over UDP, upstream endpoint over UDP
fn run_udp_udp_test(c: &mut Criterion) -> Result<()> {
    logging::init_logging();
    let tmpstorage = tempfile::tempdir()?;

    // Use queue to notify threads to stop
    let stop = Arc::new(ArrayQueue::new(2));

    // Start upstream harness
    let (upstream_addr, upstream_join_handle) = start_udp_upstream(stop.clone())?;
    let config = Config::new_for_test(tmpstorage.path().to_str().expect("invalid temp storage path"), upstream_addr.to_string());

    // Start kapiti server
    let mut runtime = Runtime::new()?;
    let mut runner = runtime.block_on(Runner::new("benchmark".to_string(), config))?;
    runner.set_stop(stop.clone());
    let kapiti_udp_endpoint = runner.get_udp_endpoint()?;
    let kapiti_join_handle = thread::spawn(move || {
        runtime.block_on(runner.run())
    });

    // Run benchmark: See how quickly we can get responses from kapiti
    let run_count: u64 = 30;
    let mut group = c.benchmark_group("server");
    group.throughput(Throughput::Elements(run_count));
    group.sample_size(50);
    group.bench_function("udp_udp", |b| b.iter_batched(
        move || setup_udp_requests().expect("setup failed"),
        move |inputs| run_udp_requests(inputs, kapiti_udp_endpoint, run_count).expect("client run failed"),
        BatchSize::PerIteration
    ));
    group.finish();

    // Send shutdown messages to be consumed by the kapiti thread and the upstream thread (not necessarily in that order)
    stop.push(()).expect("unable to fit first message on stop queue");
    stop.push(()).expect("unable to fit second message on stop queue");

    // Wait for kapiti thread to get the message
    debug!("Waiting for kapiti thread to exit...");
    kapiti_join_handle.join().expect("failed to join kapiti thread")?;

    // Wait for upstream thread to get the message
    debug!("Waiting for upstream thread to exit...");
    upstream_join_handle.join().expect("failed to join upstream harness thread")?;

    Ok(())
}

fn udp_udp(c: &mut Criterion) {
    run_udp_udp_test(c).expect("udp_udp test failed");
}

criterion_group!(benches, udp_udp);
criterion_main!(benches);
\ No newline at end of file

M src/codec/encoder.rs => src/codec/encoder.rs +3 -3
@@ 46,7 46,7 @@ impl DNSMessageEncoder {
                op_code: Integer::from(0 /*QUERY*/),
                authoritative: false,
                truncated: false,
                recursion_desired: false,
                recursion_desired: true,
                recursion_available: true,
                reserved_9: false,
                authentic_data: false,


@@ 55,12 55,12 @@ impl DNSMessageEncoder {

                question_count: 1,
                answer_count: match ip {
                    Some(_) => 1,
                    Some(_) => 1, // A or AAA resource
                    None => 0,
                },
                authority_count: match ip {
                    Some(_) => 0,
                    None => 1,
                    None => 1, // SOA resource
                },
                additional_count: match orig_opt {
                    Some(_) => 1,

M src/config.rs => src/config.rs +19 -1
@@ 20,7 20,8 @@ pub struct Config {
    #[serde(default = "default_storage")]
    pub storage: String,

    /// If the service is started as the root user, what user it should "downgrade" to. Defaults to `nobody`.
    /// If the service is started as the root user, what user it should "downgrade" to.
    /// Defaults to `nobody`, or may be set to an empty string to disable.
    #[serde(default = "default_user")]
    pub user: String,



@@ 63,6 64,23 @@ pub struct Config {
    pub redis: String,
}

impl Config {
    /// Returns a new `Config` instance suitable for use in benchmark tests.
    /// Most values are empty or left as their defaults, while the "listen" value is set to `127.0.0.1:0` for an ephemeral port.
    pub fn new_for_test(storage: &str, upstream: String) -> Config {
        Config {
            storage: storage.to_string(),
            // Disable user downgrade to avoid system-specific issues (what if 'nobody' doesn't exist in the test environment?)
            user: "".to_string(),
            listen: "127.0.0.1:0".to_string(),
            upstreams: vec![upstream],
            overrides: vec![],
            blocks: vec![],
            redis: "".to_string(),
        }
    }
}

fn default_storage() -> String {
    "/tmp/kapiti".to_string()
}

M src/main.rs => src/main.rs +5 -4
@@ 51,16 51,17 @@ fn main() -> Result<()> {
    debug!("config: {:?}", config);

    // Get downgrade user before passing ownership of config
    let downgrade_user = config.user.clone();
    let downgrade_user_orig = config.user.clone();
    let downgrade_user = downgrade_user_orig.trim();

    let mut runtime = Runtime::new()?;

    // Needs to run async since it sets up the sockets internally
    let mut runner = runtime.block_on(Runner::new(config_path, config))?;

    // If currently root, downgrade to non-root after Runner has set up any sockets
    if unistd::geteuid().is_root() {
        chuser(downgrade_user.trim())
    // If currently root, downgrade to specified non-root user after Runner has set up any sockets
    if !downgrade_user.is_empty() && unistd::geteuid().is_root() {
        chuser(downgrade_user)
            .with_context(|| format!("Failed to change to user: {}", downgrade_user))?;
        info!("Changed to user: {}", downgrade_user);
    }

M src/runner.rs => src/runner.rs +52 -27
@@ 3,14 3,16 @@
use std::fs::create_dir;
use std::net::{SocketAddr, ToSocketAddrs};
use std::path::Path;
use std::sync::Arc;
use std::time::Duration;

use anyhow::{bail, Context, Result};
use bytes::BytesMut;
use crossbeam_queue::ArrayQueue;
use redis::{self, IntoConnectionInfo};
use tokio::net::UdpSocket;
use tokio::time;
use tracing::{self, info, trace, warn};
use tracing::{self, debug, info, trace, warn};

use crate::client::{self, hyper::Resolver};
use crate::filter::filter;


@@ 19,10 21,10 @@ use crate::{config, server};
/// Runs the server. Separate from main.rs to simplify testing in benchmarks
pub struct Runner {
    // TODO(#4) implement TCP server socket as well
    server_sock: UdpSocket,
    udp_sock: UdpSocket,
    config_path: String,
    config: config::Config,
    request_limit: u32,
    stop: Option<Arc<ArrayQueue<()>>>,
}

impl Runner {


@@ 34,23 36,28 @@ impl Runner {
            .to_socket_addrs()?
            .next()
            .with_context(|| format!("Invalid listen address: {}", listen_host))?;
        let server_sock = UdpSocket::bind(listen_addr)
        let udp_sock = UdpSocket::bind(listen_addr)
            .await
            .with_context(|| format!("Failed to listen on {}", listen_addr))?;

        Ok(Runner {
            server_sock,
            udp_sock,
            config_path,
            config,
            request_limit: 0,
            stop: None,
        })
    }

    /// Sets a limit on the number of requests to be served before exiting the run loop.
    /// This is mainly for use in tests where we don't want the server to run forever.
    /// The default is zero, for no limit.
    pub fn set_request_limit(self: &mut Runner, limit: u32) {
        self.request_limit = limit;
    /// Configures the runner with a queue that will tell the runner when to stop processing requests.
    /// For use in tests where we want to cleanly shut down the runner thread.
    pub fn set_stop(self: &mut Runner, stop: Arc<ArrayQueue<()>>) {
        self.stop = Some(stop);
    }

    /// Returns the listen endpoint for the UDP socket.
    /// This is for testing cases, where an ephemeral listen port is being used.
    pub fn get_udp_endpoint(self: &Runner) -> Result<SocketAddr> {
        return self.udp_sock.local_addr().with_context(|| "Couldn't get local UDP socket address");
    }

    /// Runs the server. This should run until one of the following occurs:


@@ 146,18 153,19 @@ impl Runner {
            }
        };

        info!("Waiting for clients at {}", self.server_sock.local_addr()?);
        info!("Waiting for clients at {}", self.udp_sock.local_addr()?);

        let mut packet_buffer = BytesMut::with_capacity(4096);
        // TODO(#12) could have server pool for accepting received requests (use crossbeam queue?)
        //           backpressure could be applied by forwarding a limited number of server_members back and forth
        let mut server_members = server::ServerMembers::new(
            // TODO(#6): Figure out config structure for supporting DoH "bootstrap"
            Box::new(client::https::Client::new(
            /*Box::new(client::https::Client::new(
                resolver,
                "https://dns.google/dns-query".to_string(),
                10000,
            )?),
            )?),*/
            Box::new(client::udp::Client::new(query_addr, 10000)),
            Some(Box::new(client::tcp::Client::new(query_addr, 10000))),
            redis_conn,
        );


@@ 168,11 176,35 @@ impl Runner {

            let request_source: SocketAddr;
            {
                let (recvsize, recvfrom) = self.server_sock.recv_from(&mut packet_buffer).await?;
                // Got a request from somewhere
                request_source = recvfrom;
                // Shorten to actual size received (doesnt affect malloc)
                packet_buffer.truncate(recvsize);
                if let Some(stop) = &self.stop {
                    // Shutdown support enabled, use a timeout on reads to check periodically for a shutdown signal
                    loop {
                        if let Ok(recvresult) = time::timeout(
                            Duration::from_millis(1000),
                            self.udp_sock.recv_from(&mut packet_buffer),
                        )
                        .await {
                            // Got something before the timeout, but the "something" might be a (non-timeout) error
                            let (recvsize, recvfrom) = recvresult?;
                            // Got a request from somewhere
                            request_source = recvfrom;
                            // Shorten to actual size received (doesnt affect malloc)
                            packet_buffer.truncate(recvsize);
                            break;
                        } else if stop.pop().is_ok() {
                            // Got a timeout, and it looks like we've been told to stop.
                            debug!("Stopping runner thread");
                            return Ok(());
                        }
                    }
                } else {
                    // Shutdown support disabled, just wait forever for the next request
                    let (recvsize, recvfrom) = self.udp_sock.recv_from(&mut packet_buffer).await?;
                    // Got a request from somewhere
                    request_source = recvfrom;
                    // Shorten to actual size received (doesnt affect malloc)
                    packet_buffer.truncate(recvsize);
                }
            }

            trace!(


@@ 190,7 222,7 @@ impl Runner {
                    // Shouldn't time out but just in case...
                    let _sendsize = time::timeout(
                        Duration::from_millis(1000),
                        self.server_sock.send_to(&mut packet_buffer, request_source),
                        self.udp_sock.send_to(&mut packet_buffer, request_source),
                    )
                    .await?;
                }


@@ 203,13 235,6 @@ impl Runner {
                    );
                }
            }

            if self.request_limit > 0 {
                self.request_limit -= 1;
                if self.request_limit == 0 {
                    return Ok(());
                }
            }
        }
    }
}

M src/server.rs => src/server.rs +7 -1
@@ 110,6 110,8 @@ pub async fn handle_query<'a>(
                bail!("Failed to parse incomplete request");
            }
            let request: Message<'_> = flatbuffers::get_root::<Message<'_>>(fbb.finished_data());
            // Mark the client buffer as empty so that we don't append on top of a prior request
            m.client_buffer.clear();
            fallback_client.encode(&request, &mut m.client_buffer)?;

            if let Some(_response) = fallback_client


@@ 143,7 145,7 @@ pub async fn handle_query<'a>(
/// In the general case we only call this once, but if there's a UDP->TCP fallback then we will call this twice, since the request encoding is slightly different between the two clients. But this fallback should be rare in practice so the cost is low.
/// - Decodes the request payload, turning it into a Message
/// - Extracts some useful info from the Message and uses it to check the Filter
/// - If the filter doesn't match, encodes the request to be sent to the DnsClient
/// - If the filter doesn't match, encodes the request to be sent to the DnsClient in `client_buffer`
async fn decode_request_check_local_response(
    client: &mut Box<dyn DnsClient>,
    redis_conn: &mut Option<redis::Connection>,


@@ 194,6 196,8 @@ async fn decode_request_check_local_response(
                }
                Ok(None)
            } else {
                // Mark the client buffer as empty so that we don't append on top of a prior request
                client_buffer.clear();
                // Filter and cache both missed: Send the encoded request to the client.
                client.encode(&request, client_buffer)?;
                Ok(Some(request_info))


@@ 204,6 208,8 @@ async fn decode_request_check_local_response(
                "No filter entry found for {}, performing upstream query",
                request_info.name
            );
            // Mark the client buffer as empty so that we don't append on top of a prior request
            client_buffer.clear();
            client.encode(&request, client_buffer)?;
            Ok(Some(request_info))
        }