~nickbp/kapiti

871b3a56041e95e711a1f13323a4fcfbd5f821c6 — Nick Parker 4 months ago 9d460d0
Clean up TODOs, make cache capacity configurable, use fork with cache entry access fix
6 files changed, 21 insertions(+), 24 deletions(-)

M Cargo.lock
M Cargo.toml
M src/cache/retainer.rs
M src/cache/task.rs
M src/config.rs
M src/hyper_smol.rs
M Cargo.lock => Cargo.lock +1 -2
@@ 1529,8 1529,7 @@ dependencies = [
[[package]]
name = "retainer"
version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "59039dbf4a344af919780e9acdf7f9ce95deffb0152a72eca94b89d6a2bf66c0"
source = "git+https://github.com/nickbp/retainer?rev=51c4b405#51c4b405bc58c33afcc07d894951ac7489f61511"
dependencies = [
 "async-lock",
 "async-timer",

M Cargo.toml => Cargo.toml +1 -1
@@ 38,7 38,7 @@ packed_struct = "0.6"
packed_struct_codegen = "0.6"
rand = "0.8"
redis = { version = "0.21", default-features = false, features = ["script"] } # disable redis async
retainer = "0.2"
retainer = { git = "https://github.com/nickbp/retainer", rev = "51c4b405" } # TODO replace with upstream once https://github.com/whitfin/retainer/pull/8 is merged
rkyv = { version = "0.7.3", features = ["validation"] }
rustls = "0.19"
rustls-native-certs = "0.5"

M src/cache/retainer.rs => src/cache/retainer.rs +6 -11
@@ 44,26 44,21 @@ impl DnsCache for Cache {
    /// Queries Redis for a cached response.
    async fn fetch(&mut self, request_info: RequestInfo) -> Result<Option<Message>> {
        let cache_key = self.key(&request_info);
        // TODO for now, only remove() exposes the full CacheEntry, while get() only exposes the value
        //      so lets 'remove' the entry to get the expiration, then put it back.
        //      the cache task is operating on a single thread/loop, so this should be safe
        match self.cache.remove(&cache_key).await {
            Some(entry) => {
        match self.cache.get(&cache_key).await {
            Some(guard) => {
                let entry = guard.entry();
                let expiration = entry.expiration().with_context(|| {
                    format!("Cache entry lacks expiration for cache_key='{}'", cache_key)
                })?;
                let cache_ttl = expiration.remaining();
                debug!("Cached response for cache_key='{}': (ttl={:?}) {}", cache_key, cache_ttl, entry.value());
                let mut updated_response = (*entry.value()).clone();
                debug!("Cached response for cache_key='{}': (ttl={:?}) {}", cache_key, cache_ttl, updated_response);
                if let Err(e) = message::update_cached_response(&mut updated_response, &request_info, cache_ttl.as_secs() as u32) {
                    warn!("Ignoring cache response at cache_key='{}': {}", cache_key, e);
                    // Skip putting it back since it's apparently bad anyway.
                    // Remove the bad entry before returning
                    self.cache.remove(&cache_key).await;
                    Ok(None)
                } else {
                    // Put the original value back since we just removed it to get the full CacheEntry
                    // TODO drop clone: only here due to "shared reference" via entry
                    //      but this whole thing is a hack anyway so let's just clone for now
                    self.cache.insert(cache_key, (*entry.value()).clone(), *expiration.instant()).await;
                    Ok(Some(updated_response))
                }
            },

M src/cache/task.rs => src/cache/task.rs +1 -5
@@ 43,11 43,7 @@ pub fn start_cache(config: &config::Config) -> Result<(async_channel::Sender<Cac
    // Use redis cache or local cache depending on config
    let redis_url = config.redis.trim();
    let mut cache: Box<dyn cache::DnsCache + Send> = if redis_url.is_empty() {
        Box::new(cache::retainer::Cache::new(
            // 10M max records stored at any one time
            // (TODO how much memory is that?)
            10000000
        ))
        Box::new(cache::retainer::Cache::new(config.cache_size))
    } else {
        Box::new(cache::redis::Cache::new(
            &redis_url,

M src/config.rs => src/config.rs +12 -1
@@ 128,6 128,12 @@ pub struct Config {
    /// Or empty/unset to disable Redis in favor of an internal in-memory cache.
    #[serde(default)]
    pub redis: String,

    /// Max records for internal in-memory cache, or 0 for no limit.
    /// The cache is automatically pruned of records as they expire, so this limit shouldn't be reached except for heavier workloads.
    /// This is ignored if `redis` is provided.
    #[serde(default = "default_cache_size")]
    pub cache_size: usize,
}

impl Config {


@@ 150,6 156,7 @@ impl Config {
            blocks: Vec::new(),
            allows: Vec::new(),
            redis: "".to_string(),
            cache_size: default_cache_size(),
        }
    }
}


@@ 179,7 186,11 @@ fn default_listen_https() -> String {
}

fn default_filter_refresh_seconds() -> u64 {
    10800 // 3 hours
    3 * 60 * 60 // 3 hours
}

fn default_cache_size() -> usize {
    100000 // 100k feels like it should be plenty
}

/// A config value that can be provided either as a string or a list of strings in the TOML file.

M src/hyper_smol.rs => src/hyper_smol.rs +0 -4
@@ 308,10 308,6 @@ impl tokio::io::AsyncRead for SmolStream {
        cx: &mut Context<'_>,
        buf: &mut ReadBuf<'_>,
    ) -> Poll<io::Result<()>> {
        // TODO what's this actually doing?
        // - invoked by hyper(?)
        // - invoking underlying std TcpStream(?)
        // TODO looks like we can accept the size from the return value and use that to update the offset in buf?
        match &mut *self {
            SmolStream::Plain(s) => {
                Pin::new(s)