~goorzhel/sota-slack-spotter

ref: 1c0f4f508e717e07cf6ae5f89f15b04f108731fe sota-slack-spotter/src/main.rs -rw-r--r-- 4.0 KiB
1c0f4f50 — Antonio Gurgel 0.4.4 4 months ago
                                                                                
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
use std::{
    env::var,
    sync::{Arc, Mutex},
    time::Duration,
};

use anyhow::{Context, Result};
use cache::new_safe_cache;
use log::{error, info};
use prometheus_exporter::prometheus::{
    opts, register_int_counter, register_int_counter_vec, register_int_gauge,
};
use tokio::{task, time::interval};

mod activation;
mod cache;
mod callsign;
mod slack;
mod sota;
mod tasks;
#[cfg(test)]
mod test;

use crate::{
    activation::Activations,
    cache::{Cachelike, SafeCache},
    callsign::Callsigns,
    slack::Slack,
    sota::{alert::Alert, spot::Spot},
    tasks::{fetch_callsigns, process_alerts, process_spots},
};

#[tokio::main]
async fn main() -> Result<()> {
    env_logger::init();
    info!("sota-slack-spotter version {}", env!("CARGO_PKG_VERSION"));
    let slack = Arc::new(Slack::from_env().context("Couldn't init Slack client")?);

    // Initialize caches.
    let cache_dir = var("CACHE_DIR").unwrap_or_else(|_| "/tmp".into());
    let cache_dir = |filename: &str| format!("{}/{}.json", cache_dir, filename);

    let mut activations = Activations::new(cache_dir("activations"));
    let alerts: SafeCache<Alert> = new_safe_cache(cache_dir("alerts"));
    let callsigns = Arc::new(Mutex::new(Callsigns::new()));
    let spots: SafeCache<Spot> = new_safe_cache(cache_dir("spots"));

    activations.init()?;
    alerts.lock().unwrap().init()?;
    spots.lock().unwrap().init()?;

    // Start Prometheus.
    prometheus_exporter::start("0.0.0.0:9000".parse().unwrap())?;
    let alerts_counter = register_int_counter!("alerts", "Relevant alerts seen so far")?;
    let callsign_gauge = register_int_gauge!("callsigns", "Callsigns parsed from Slack user list")?;
    let errors_counter =
        register_int_counter_vec!(opts!("errors", "Errors encountered"), &["cause"])?;
    let errors_counter = Arc::new(Mutex::new(errors_counter));
    let spots_counter = register_int_counter!("spots", "Relevant spots seen so far")?;

    // Define tasks.

    // https://stackoverflow.com/a/71190593
    // TODO: Any better way to do this?
    let callsigns_clone = Arc::clone(&callsigns);
    let slack_clone = Arc::clone(&slack);
    let errors_counter_clone = errors_counter.clone();

    let callsigns_every_hour = task::spawn(async move {
        let mut interval = interval(Duration::from_secs(3600));
        loop {
            interval.tick().await;
            let mut callsigns = callsigns.lock().unwrap();
            match fetch_callsigns(&slack, &mut callsigns) {
                Ok(count) => callsign_gauge.set(count.try_into().unwrap()),
                Err(e) => {
                    error!("Error while fetching callsigns: {:?}", e);
                    let ec = errors_counter.lock().unwrap();
                    ec.with_label_values(&["callsigns"]).inc();
                }
            }
        }
    });

    let sota_every_minute = task::spawn(async move {
        let mut interval = interval(Duration::from_secs(60));
        loop {
            interval.tick().await;
            let mut spots = spots.lock().unwrap();
            let mut alerts = alerts.lock().unwrap();
            let callsigns = callsigns_clone.lock().unwrap();

            match process_spots(&slack_clone, &callsigns, &mut activations, &mut spots) {
                Ok(c) => spots_counter.inc_by(c.try_into().unwrap()),
                Err(e) => {
                    error!("Error while processing spots: {:?}", e);
                    let ec = errors_counter_clone.lock().unwrap();
                    ec.with_label_values(&["spots"]).inc();
                }
            }
            match process_alerts(&slack_clone, &callsigns, &mut alerts) {
                Ok(c) => alerts_counter.inc_by(c.try_into().unwrap()),
                Err(e) => {
                    error!("Error while processing alerts: {:?}", e);
                    let ec = errors_counter_clone.lock().unwrap();
                    ec.with_label_values(&["alerts"]).inc();
                }
            }
        }
    });

    callsigns_every_hour.await?;
    sota_every_minute.await?;
    Ok(())
}