M Cargo.lock => Cargo.lock +1 -2
@@ 1529,8 1529,7 @@ dependencies = [
[[package]]
name = "retainer"
version = "0.2.2"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "59039dbf4a344af919780e9acdf7f9ce95deffb0152a72eca94b89d6a2bf66c0"
+source = "git+https://github.com/nickbp/retainer?rev=51c4b405#51c4b405bc58c33afcc07d894951ac7489f61511"
dependencies = [
"async-lock",
"async-timer",
M Cargo.toml => Cargo.toml +1 -1
@@ 38,7 38,7 @@ packed_struct = "0.6"
packed_struct_codegen = "0.6"
rand = "0.8"
redis = { version = "0.21", default-features = false, features = ["script"] } # disable redis async
-retainer = "0.2"
+retainer = { git = "https://github.com/nickbp/retainer", rev = "51c4b405" } # TODO replace with upstream once https://github.com/whitfin/retainer/pull/8 is merged
rkyv = { version = "0.7.3", features = ["validation"] }
rustls = "0.19"
rustls-native-certs = "0.5"
M src/cache/retainer.rs => src/cache/retainer.rs +6 -11
@@ 44,26 44,21 @@ impl DnsCache for Cache {
/// Queries Redis for a cached response.
async fn fetch(&mut self, request_info: RequestInfo) -> Result<Option<Message>> {
let cache_key = self.key(&request_info);
- // TODO for now, only remove() exposes the full CacheEntry, while get() only exposes the value
- // so lets 'remove' the entry to get the expiration, then put it back.
- // the cache task is operating on a single thread/loop, so this should be safe
- match self.cache.remove(&cache_key).await {
- Some(entry) => {
+ match self.cache.get(&cache_key).await {
+ Some(guard) => {
+ let entry = guard.entry();
let expiration = entry.expiration().with_context(|| {
format!("Cache entry lacks expiration for cache_key='{}'", cache_key)
})?;
let cache_ttl = expiration.remaining();
- debug!("Cached response for cache_key='{}': (ttl={:?}) {}", cache_key, cache_ttl, entry.value());
let mut updated_response = (*entry.value()).clone();
+ debug!("Cached response for cache_key='{}': (ttl={:?}) {}", cache_key, cache_ttl, updated_response);
if let Err(e) = message::update_cached_response(&mut updated_response, &request_info, cache_ttl.as_secs() as u32) {
warn!("Ignoring cache response at cache_key='{}': {}", cache_key, e);
- // Skip putting it back since it's apparently bad anyway.
+ // Remove the bad entry before returning
+ self.cache.remove(&cache_key).await;
Ok(None)
} else {
- // Put the original value back since we just removed it to get the full CacheEntry
- // TODO drop clone: only here due to "shared reference" via entry
- // but this whole thing is a hack anyway so let's just clone for now
- self.cache.insert(cache_key, (*entry.value()).clone(), *expiration.instant()).await;
Ok(Some(updated_response))
}
},
M src/cache/task.rs => src/cache/task.rs +1 -5
@@ 43,11 43,7 @@ pub fn start_cache(config: &config::Config) -> Result<(async_channel::Sender<Cac
// Use redis cache or local cache depending on config
let redis_url = config.redis.trim();
let mut cache: Box<dyn cache::DnsCache + Send> = if redis_url.is_empty() {
- Box::new(cache::retainer::Cache::new(
- // 10M max records stored at any one time
- // (TODO how much memory is that?)
- 10000000
- ))
+ Box::new(cache::retainer::Cache::new(config.cache_size))
} else {
Box::new(cache::redis::Cache::new(
&redis_url,
M src/config.rs => src/config.rs +12 -1
@@ 128,6 128,12 @@ pub struct Config {
/// Or empty/unset to disable Redis in favor of an internal in-memory cache.
#[serde(default)]
pub redis: String,
+
+ /// Max records for internal in-memory cache, or 0 for no limit.
+ /// The cache is automatically pruned of records as they expire, so this limit shouldn't be reached except for heavier workloads.
+ /// This is ignored if `redis` is provided.
+ #[serde(default = "default_cache_size")]
+ pub cache_size: usize,
}
impl Config {
@@ 150,6 156,7 @@ impl Config {
blocks: Vec::new(),
allows: Vec::new(),
redis: "".to_string(),
+ cache_size: default_cache_size(),
}
}
}
@@ 179,7 186,11 @@ fn default_listen_https() -> String {
}
fn default_filter_refresh_seconds() -> u64 {
- 10800 // 3 hours
+ 3 * 60 * 60 // 3 hours
+}
+
+fn default_cache_size() -> usize {
+ 100000 // 100k feels like it should be plenty
}
/// A config value that can be provided either as a string or a list of strings in the TOML file.
M src/hyper_smol.rs => src/hyper_smol.rs +0 -4
@@ 308,10 308,6 @@ impl tokio::io::AsyncRead for SmolStream {
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
- // TODO what's this actually doing?
- // - invoked by hyper(?)
- // - invoking underlying std TcpStream(?)
- // TODO looks like we can accept the size from the return value and use that to update the offset in buf?
match &mut *self {
SmolStream::Plain(s) => {
Pin::new(s)