~nickbp/kapiti

ref: 871b3a56041e95e711a1f13323a4fcfbd5f821c6 kapiti/src/cache/task.rs -rw-r--r-- 2.8 KiB
871b3a56Nick Parker Clean up TODOs, make cache capacity configurable, use fork with cache entry access fix 2 months ago
                                                                                
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
use std::sync::Arc;
use std::time::Duration;

use anyhow::Result;
use async_lock::{Barrier, Mutex};
use smol::Task;
use tracing::{trace, warn};

use crate::codec::message::RequestInfo;
use crate::specs::message::Message;
use crate::{cache, config};

/// A fetch request, and a path for sending the response.
pub struct CacheFetch {
    pub request_info: RequestInfo,
    /// Barrier to wait for the result to appear. The requestor should wait on this before accessing result.
    pub result_barrier: Arc<Barrier>,
    /// Where the result should go.
    pub result: Arc<Mutex<Option<Result<Option<Message>>>>>,
}

/// A store request. Any errors are silently handled internally.
pub struct CacheStore {
    pub request_info: RequestInfo,
    pub response: Message,
}

/// The request for a host to be resolved, along with an output for returning the response.
pub enum CacheMsg {
    Fetch(CacheFetch),
    Store(CacheStore),
}

/// Listens for Cache requests until the channel receiver is closed.
pub fn start_cache(config: &config::Config) -> Result<(async_channel::Sender<CacheMsg>, Task<()>)> {
    // Set up a channel for handling cache lookups/updates
    // We use the channel pattern here to avoid issues around a shared/mutexed cache object, which
    // internally may need to make await calls within the mutex lock guard. Rust doesn't like that.
    let (cache_tx, cache_rx): (
        async_channel::Sender<CacheMsg>,
        async_channel::Receiver<CacheMsg>,
    ) = async_channel::bounded(32);
    // Use redis cache or local cache depending on config
    let redis_url = config.redis.trim();
    let mut cache: Box<dyn cache::DnsCache + Send> = if redis_url.is_empty() {
        Box::new(cache::retainer::Cache::new(config.cache_size))
    } else {
        Box::new(cache::redis::Cache::new(
            &redis_url,
            // read/write timeout - 1s is an eternity for DNS, and for Redis
            Duration::from_millis(1000)
        )?)
    };

    let task = smol::spawn(async move {
        trace!("Cache task waiting for requests");
        while let Ok(msg) = cache_rx.recv().await {
            match msg {
                CacheMsg::Fetch(fetch) => {
                    trace!("Cache fetch: {}", fetch.request_info.name);
                    let result = cache.fetch(fetch.request_info).await;
                    fetch.result.lock().await.replace(result);
                    fetch.result_barrier.wait().await;
                },
                CacheMsg::Store(store) => {
                    trace!("Cache store: {}", store.request_info.name);
                    if let Err(e) = cache.store(store.request_info, store.response).await {
                        warn!("Cache store failed: {}", e);
                    }
                }
            }
        }
    });

    Ok((cache_tx, task))
}