~nickbp/force-rss

e145e488830a31e4850368d350a89e2a6edf8c1b — Nick Parker 3 years ago de3bc32
Implement caching, with a bit of cleanup
5 files changed, 137 insertions(+), 65 deletions(-)

M src/http.rs
M src/logging.rs
M src/main.rs
M src/rss.rs
M src/twitch.rs
M src/http.rs => src/http.rs +75 -28
@@ 2,6 2,7 @@ use anyhow::{Context, Result};
use async_lock::Mutex;
use async_std::task;
use std::sync::Arc;
use std::time::Duration;
use tide::Request;
use tracing::{debug, error, info, warn};



@@ 11,8 12,11 @@ struct State {
    twitch_auth: twitch::TwitchAuth,
    thumbnail_width: String,
    thumbnail_height: String,
    user_cache: retainer::cache::Cache<String, usize>,
    video_cache: retainer::cache::Cache<usize, Vec<twitch::GetVideosEntry>>,
    user_cache_secs: u64,
    video_cache_secs: u64,
    username_to_user_cache: Arc<retainer::cache::Cache<String, twitch::GetUsersEntry>>,
    userid_to_videos_cache: Arc<retainer::cache::Cache<String, Vec<twitch::GetVideosEntry>>>,
    _cache_monitors: Vec<async_std::task::JoinHandle<()>>,
}

pub fn run(


@@ 20,21 24,42 @@ pub fn run(
    twitch_auth: twitch::TwitchAuth,
    thumbnail_width: String,
    thumbnail_height: String,
    user_cache_secs: u64,
    video_cache_secs: u64,
) -> Result<()> {
    let state = State {
    let mut state = State {
        twitch_auth,
        thumbnail_width,
        thumbnail_height,
        user_cache: retainer::cache::Cache::new(),
        video_cache: retainer::cache::Cache::new(),
        user_cache_secs,
        video_cache_secs,
        username_to_user_cache: Arc::new(retainer::cache::Cache::new()),
        userid_to_videos_cache: Arc::new(retainer::cache::Cache::new()),
        _cache_monitors: vec![],
    };
    let user_cache_clone = state.username_to_user_cache.clone();
    state._cache_monitors.push(task::spawn(async move {
        // Every 3s, check 4 entries for expiration.
        // If >25% were expired, then repeat the check for another 4 entries
        user_cache_clone
            .monitor(4, 0.25, Duration::from_secs(3))
            .await
    }));
    let video_cache_clone = state.userid_to_videos_cache.clone();
    state._cache_monitors.push(task::spawn(async move {
        // Every 3s, check 4 entries for expiration.
        // If >25% were expired, then repeat the check for another 4 entries
        video_cache_clone
            .monitor(4, 0.25, Duration::from_secs(3))
            .await
    }));

    let mut app = tide::with_state(Arc::new(Mutex::new(state)));
    // Enable request logging if debug (or trace)
    app.with(LogMiddleware {});
    app.at("/").get(handle_index);
    app.at("/rss").get(handle_rss);
    info!("Listening at {}", listen_endpoint);
    info!("Listening at LISTEN={}", listen_endpoint);
    return task::block_on(app.listen(listen_endpoint)).context("HTTP listener exited");
}



@@ 66,33 91,55 @@ async fn handle_rss(req: Request<Arc<Mutex<State>>>) -> tide::Result {

    match query_account {
        Some(account) => {
            // TODO check userid cache, then try get_user_id
            // TODO check video cache, then try get_videos
            let response_body: String;
            {
                let mut state = req.state().lock().await;
                let user: twitch::GetUsersEntry;
                let videos: Vec<twitch::GetVideosEntry>;
                match twitch::get_user(&mut state.twitch_auth, account).await {
                    Ok(u) => {
                        user = u;
                    }
                    Err(e) => {
                        return Ok(tide::Response::builder(400)
                            .body(format!("400 Bad Request: {}", e))
                            .content_type(tide::http::mime::PLAIN)
                            .build());
                    }
                };
                match twitch::get_videos(&mut state.twitch_auth, &user.id).await {
                    Ok(v) => {
                        videos = v;
                    }
                    Err(e) => {
                        return Ok(tide::Response::builder(400)
                            .body(format!("400 Bad Request: {}", e))
                            .content_type(tide::http::mime::PLAIN)
                            .build());
                if let Some(u) = state.username_to_user_cache.get(&account).await {
                    user = (*u).clone();
                } else {
                    match twitch::get_user(&mut state.twitch_auth, &account).await {
                        Ok(u) => {
                            state
                                .username_to_user_cache
                                .insert(
                                    account,
                                    u.clone(),
                                    Duration::from_secs(state.user_cache_secs),
                                )
                                .await;
                            user = u;
                        }
                        Err(e) => {
                            return Ok(tide::Response::builder(400)
                                .body(format!("400 Bad Request: {}", e))
                                .content_type(tide::http::mime::PLAIN)
                                .build());
                        }
                    };
                }
                if let Some(v) = state.userid_to_videos_cache.get(&user.id).await {
                    videos = (*v).clone();
                } else {
                    match twitch::get_videos(&mut state.twitch_auth, &user.id).await {
                        Ok(v) => {
                            state
                                .userid_to_videos_cache
                                .insert(
                                    user.id.clone(),
                                    v.clone(),
                                    Duration::from_secs(state.video_cache_secs),
                                )
                                .await;
                            videos = v;
                        }
                        Err(e) => {
                            return Ok(tide::Response::builder(400)
                                .body(format!("400 Bad Request: {}", e))
                                .content_type(tide::http::mime::PLAIN)
                                .build());
                        }
                    }
                };
                response_body = rss::to_rss(

M src/logging.rs => src/logging.rs +5 -5
@@ 1,6 1,5 @@
use tracing;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::{fmt, EnvFilter};
use tracing_subscriber::EnvFilter;

pub fn init_logging() {
    let filter_layer = EnvFilter::try_from_env("LOG_LEVEL")


@@ 8,9 7,10 @@ pub fn init_logging() {
        .expect("Failed to initialize filter layer");

    tracing::subscriber::set_global_default(
        tracing_subscriber::Registry::default()
            .with(filter_layer)
            .with(fmt::layer()),
        tracing_subscriber::fmt()
            .with_writer(std::io::stderr)
            .with_env_filter(filter_layer)
            .finish(),
    )
    .expect("Failed to set default subscriber");
}

M src/main.rs => src/main.rs +26 -9
@@ 7,7 7,7 @@ use anyhow::{bail, Context, Result};
use async_std::task;
use git_testament::{git_testament, render_testament};
use std::env;
use tracing::{debug, error, info};
use tracing::{error, info};

use crate::twitch::TwitchAuth;



@@ 34,9 34,12 @@ fn env_help() -> &'static str {
Environment variables:
  CLIENT_ID:     Twitch Client ID for querying feeds
  CLIENT_SECRET: Twitch Client Secret for querying feeds
  LISTEN:        Endpoint for HTTP service to listen on
                 (default: '0.0.0.0:8080')
  LOG_LEVEL:     error/warn/info/debug/trace/off";
  LOG_LEVEL:     error/warn/info/debug/trace/off

HTTP mode only:
  LISTEN:           Endpoint for HTTP service to listen on (default: '0.0.0.0:8080')
  USER_CACHE_SECS:  Duration in seconds to cache Twitch account info (default: 86400)
  VIDEO_CACHE_SECS: Duration in seconds to cache video lists (default: 600)";
}

fn main() -> Result<()> {


@@ 56,7 59,7 @@ fn main() -> Result<()> {
            task::block_on(async move {
                let user: twitch::GetUsersEntry;
                let videos: Vec<twitch::GetVideosEntry>;
                match twitch::get_user(&mut twitch_auth, username.clone()).await {
                match twitch::get_user(&mut twitch_auth, &username).await {
                    Ok(u) => {
                        user = u;
                    }


@@ 65,7 68,6 @@ fn main() -> Result<()> {
                        return;
                    }
                };
                debug!("{:?}", user);
                match twitch::get_videos(&mut twitch_auth, &user.id).await {
                    Ok(v) => {
                        videos = v;


@@ 78,9 80,18 @@ fn main() -> Result<()> {
                        return;
                    }
                };
                debug!("{:?}", videos);
                println!("{:?}", videos);
                // TODO rss to stdout
                match rss::to_rss(
                    &user,
                    &videos,
                    &optenv("THUMBNAIL_WIDTH", "640"),
                    &optenv("THUMBNAIL_HEIGHT", "360"),
                ) {
                    Ok(content) => println!("{}", content),
                    Err(e) => {
                        error!("Failed to render RSS: {}", e);
                        return;
                    }
                };
            });
            Ok(())
        }


@@ 94,6 105,12 @@ fn main() -> Result<()> {
                ),
                optenv("THUMBNAIL_WIDTH", "640"),
                optenv("THUMBNAIL_HEIGHT", "360"),
                optenv("USER_CACHE_SECS", "86400")
                    .parse::<u64>()
                    .with_context(|| "USER_CACHE_SECS must be an unsigned int")?,
                optenv("VIDEO_CACHE_SECS", "600")
                    .parse::<u64>()
                    .with_context(|| "VIDEO_CACHE_SECS must be an unsigned int")?,
            )
        }
    }

M src/rss.rs => src/rss.rs +27 -19
@@ 11,26 11,31 @@ pub fn to_rss(
) -> Result<String> {
    let mut items = vec![];
    for video in videos {
        let mut item_builder = ItemBuilder::default();
        item_builder
            .title(format!("{} ({})", video.title, video.duration))
            .content(format!(
                "<p><img src='{}'/></p><p>{}</p>",
                video
                    .thumbnail_url
                    .replace("%{width}", thumbnail_width)
                    .replace("%{height}", thumbnail_height),
                video.duration
            ))
            .link(video.url.to_string())
            .pub_date(video.published_at.to_string());
        if !video.description.is_empty() {
            item_builder.description(video.description.to_string());
        }
        items.push(
            ItemBuilder::default()
                .title(video.title.clone())
                .content(format!(
                    "<p><img src='{}'/></p><p>{}</p>",
                    video
                        .thumbnail_url
                        .replace("%{width}", thumbnail_width)
                        .replace("%{height}", thumbnail_height),
                    video.duration
                ))
                .link(video.url.clone())
                .description(format!("{:?}", video)) // TODO TEMP
                .pub_date(video.published_at.clone())
            item_builder
                .build()
                .map_err(|e| anyhow!("Error when rendering RSS item: {}", e))?,
        );
    }
    let channel = ChannelBuilder::default()
        .title(format!("Twitch: {}", user.display_name))
    let mut channel_builder = ChannelBuilder::default();
    channel_builder
        .title(user.display_name.to_string())
        .image(Some(
            ImageBuilder::default()
                .url(user.profile_image_url.as_str())


@@ 38,10 43,13 @@ pub fn to_rss(
                .map_err(|e| anyhow!("Error when rendering RSS channel image: {}", e))?,
        ))
        .link(format!("https://twitch.tv/{}", user.login))
        .description(user.description.as_str())
        .generator("twitch-rss".to_string())
        .items(items)
        .items(items);
    if !user.description.is_empty() {
        channel_builder.description(user.description.as_str());
    }
    Ok(channel_builder
        .build()
        .map_err(|e| anyhow!("Error when rendering RSS channel: {}", e))?;
    Ok(channel.to_string())
        .map_err(|e| anyhow!("Error when rendering RSS channel: {}", e))?
        .to_string())
}

M src/twitch.rs => src/twitch.rs +4 -4
@@ 49,7 49,7 @@ struct GetUsersResponse {
"view_count":102251,
"created_at":"2021-05-20T22:34:07.072555Z"
*/
#[derive(Debug, Deserialize)]
#[derive(Clone, Debug, Deserialize)]
pub struct GetUsersEntry {
    pub id: String,
    pub login: String,


@@ 89,7 89,7 @@ struct GetVideosResponse {
"duration": "3h11m2s",
"muted_segments": null
*/
#[derive(Debug, Deserialize)]
#[derive(Clone, Debug, Deserialize)]
pub struct GetVideosEntry {
    pub id: String,
    pub title: String,


@@ 196,9 196,9 @@ impl TwitchAuth {

// see also https://dev.twitch.tv/docs/api/reference#get-users
// curl -v -H 'Client-Id: CLIENT_ID' -H 'Authorization: Bearer AUTH_TOKEN' "https://api.twitch.tv/helix/users?login=USER_NAME"
pub async fn get_user(auth: &mut TwitchAuth, user_name: String) -> Result<GetUsersEntry> {
pub async fn get_user(auth: &mut TwitchAuth, user_name: &str) -> Result<GetUsersEntry> {
    let query = GetUsersQuery {
        login: user_name.clone(),
        login: user_name.to_string(),
    };
    let mut response = send(helix_request("users", &query, auth).await?).await?;
    if !response.status().is_success() {