~jojo/huelia

b6763fb9ccf3f52da95c64a8eb4a58375426cb0b — JoJo 1 year, 22 days ago f06879f
better snooze & led flash reduction
4 files changed, 104 insertions(+), 120 deletions(-)

M Cargo.lock
M Cargo.toml
M performer/src/main.rs
M performer/src/mqtt.rs
M Cargo.lock => Cargo.lock +2 -2
@@ 1105,7 1105,7 @@ checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9"

[[package]]
name = "huelia-conductor"
version = "0.4.0"
version = "0.4.1"
dependencies = [
 "hostname",
 "http",


@@ 1123,7 1123,7 @@ dependencies = [

[[package]]
name = "huelia-performer"
version = "0.4.0"
version = "0.4.1"
dependencies = [
 "anyhow",
 "embedded-hal 1.0.0-rc.1",

M Cargo.toml => Cargo.toml +1 -1
@@ 5,7 5,7 @@ resolver = "2"

[workspace.package]
authors = ["JoJo <jo@jo.zone>"]
version = "0.4.0"
version = "0.4.1"
edition = "2021"

[workspace.dependencies]

M performer/src/main.rs => performer/src/main.rs +93 -99
@@ 2,12 2,12 @@

use huelia_performer::{mqtt::*, net::*, ota::*, *};

use anyhow::{anyhow, Result};
use anyhow::{anyhow, bail, Result};
use embedded_svc::http::client::Client as HttpClient;
use esp_idf_hal::{gpio::*, peripherals::Peripherals};
use esp_idf_svc::{
    http::client::{Configuration as HttpConfiguration, EspHttpConnection},
    nvs::{EspCustomNvsPartition, EspNvs},
    nvs::{EspCustomNvsPartition, EspNvs, NvsCustom},
    ota::EspOta,
};
// If using the `binstart` feature of `esp-idf-sys`, always keep this module imported


@@ 31,19 31,38 @@ const NAME_PREFIX_MAX_LEN: usize = 32;
const COLOR_RING_LEN: usize = 16;

fn main() {
    match main_() {
        Ok(()) => log::info!("main returned Ok"),
        Err(e) => log::error!("main returned Err: {e}"),
    }
    restart()
}

fn main_() -> Result<()> {
    SimpleLogger::new().init().unwrap();
    esp_idf_sys::link_patches(); // hack to make sure that a few patches are linked into the final executable

    let peripherals = Peripherals::take().unwrap();
    let pins = peripherals.pins;
    let mut white_led = PinDriver::output(pins.gpio19).unwrap();
    white_led.set_low().unwrap();
    log::info!("Good morning, let's bake this bread!");
    let storage = Arc::new(Mutex::new({
        let (namespace, rw) = ("huelia", true);
        EspNvs::new(EspCustomNvsPartition::take("nvs_ext").unwrap(), namespace, rw).unwrap()
    }));
    while let Err(e) = main_(storage.clone()) {
        log::error!("main returned Err: {e}");
        let mut storage = storage.lock().unwrap();
        let max_quick_attempts = 5;
        let quick_snooze = Duration::from_secs(2);
        let slow_snooze = Duration::from_secs(10 * 60);
        let attempts = read_start_attempts(&mut storage).unwrap();
        write_start_attempts(&mut storage, attempts.saturating_add(1)).ok();
        if attempts < max_quick_attempts {
            log::info!("fewer startup attempts than {max_quick_attempts}. quick snooze before reset");
            sleep(quick_snooze);
        } else {
            log::info!("too many failed restarts (attempts {attempts} > max {max_quick_attempts}). will go sleep for much longer before attempting a restart again. less annoying chatter & flashing of the white LED that way");
            sleep(slow_snooze);
        }
        restart()
    }
    log::info!("main returned Ok. Powering off - no restart.")
}

fn main_(storage: Arc<Mutex<EspNvs<NvsCustom>>>) -> Result<()> {
    let wifi = connect_wifi(env!("HUELIA_PERFORMER_WIFI_SSID"), env!("HUELIA_PERFORMER_WIFI_PASSWORD"))?;

    let _ntp = esp_idf_svc::sntp::EspSntp::new(&esp_idf_svc::sntp::SntpConf {


@@ 52,74 71,65 @@ fn main_() -> Result<()> {
        sync_mode: esp_idf_svc::sntp::SyncMode::Immediate,
    })?;

    let mut storage = {
        let (namespace, rw) = ("huelia", true);
        EspNvs::new(EspCustomNvsPartition::take("nvs_ext")?, namespace, rw)
    }?;

    // == RGB LED (timers & PWM) ==

    let peripherals = Peripherals::take().unwrap();
    let pins = peripherals.pins;

    let mut white_led = PinDriver::output(pins.gpio19)?;
    white_led.set_low()?;

    let mut led_strip = {
        let (chan, pin) = (0, 0);
        Ws2812Esp32Rmt::new(chan, pin)
    }?;

    // == Read storage ==
    log::info!("reading storage");
    let (name_prefix, name) = {
        let mac = wifi.sta_netif().get_mac()?;
        let suffix = format!(
            "ESP32C3-{:02X}:{:02X}:{:02X}:{:02X}:{:02X}:{:02X}",
            mac[0], mac[1], mac[2], mac[3], mac[4], mac[5]
        );
        let mut prefix = [0u8; NAME_PREFIX_MAX_LEN];
        match storage.get_raw("name-prefix", &mut prefix)? {
            Some(prefix) => match std::str::from_utf8(prefix) {
                Ok(prefix) => (Some(prefix.to_owned()), format!("{prefix}-{suffix}")),
                Err(e) => {
                    log::error!("Error reading name-prefix from nvs as utf8\n{e}\nUsing unprefixed name");
                    (None, suffix)
    let (name_prefix, name, n_leds, current_limit_ma) = {
        let mut storage = storage.lock().unwrap();
        log::info!("reading storage");
        let (name_prefix, name) = {
            let mac = wifi.sta_netif().get_mac()?;
            let suffix = format!(
                "ESP32C3-{:02X}:{:02X}:{:02X}:{:02X}:{:02X}:{:02X}",
                mac[0], mac[1], mac[2], mac[3], mac[4], mac[5]
            );
            let mut prefix = [0u8; NAME_PREFIX_MAX_LEN];
            match storage.get_raw("name-prefix", &mut prefix)? {
                Some(prefix) => match std::str::from_utf8(prefix) {
                    Ok(prefix) => (Some(prefix.to_owned()), format!("{prefix}-{suffix}")),
                    Err(e) => {
                        log::error!("Error reading name-prefix from nvs as utf8\n{e}\nUsing unprefixed name");
                        (None, suffix)
                    }
                },
                None => (None, suffix),
            }
        };
        log::info!("name: \"{name}\"");
        let n_leds = {
            let mut buf = [0u8; 2];
            match storage.get_raw("n-leds", &mut buf) {
                Ok(Some(_)) => u16::from_le_bytes(buf) as usize,
                Ok(None) => 1,
                Err(e) if e.code() == ESP_ERR_NVS_INVALID_LENGTH => {
                    log::error!("invalid length error when reading \"n-leds\" from nvs. The field length might've changed between updates. Setting it to the default value.");
                    storage.set_raw("n-leds", &1u16.to_le_bytes())?;
                    restart()
                }
            },
            None => (None, suffix),
        }
    };
    log::info!("name: \"{name}\"");
    let n_leds = {
        let mut buf = [0u8; 2];
        match storage.get_raw("n-leds", &mut buf) {
            Ok(Some(_)) => u16::from_le_bytes(buf) as usize,
            Ok(None) => 1,
            Err(e) if e.code() == ESP_ERR_NVS_INVALID_LENGTH => {
                log::error!("invalid length error when reading \"n-leds\" from nvs. The field length might've changed between updates. Setting it to the default value.");
                storage.set_raw("n-leds", &1u16.to_le_bytes())?;
                restart()
                Err(e) => Err(e)?,
            }
            Err(e) => Err(e)?,
        }
    };
    log::info!("n leds: {n_leds}");
    let current_limit_ma = {
        let mut buf = [0u8; 2];
        let default_limit_ma = 250u16;
        match storage.get_raw("current-limit", &mut buf) {
            Ok(Some(_)) => u16::from_le_bytes(buf),
            Ok(None) => default_limit_ma,
            Err(e) if e.code() == ESP_ERR_NVS_INVALID_LENGTH => {
                log::error!("invalid length error when reading \"current-limit\" from nvs. The field length might've changed between updates. Setting it to the default value.");
                storage.set_raw("current-limit", &default_limit_ma.to_le_bytes())?;
                restart()
        };
        log::info!("n leds: {n_leds}");
        let current_limit_ma = {
            let mut buf = [0u8; 2];
            let default_limit_ma = 250u16;
            match storage.get_raw("current-limit", &mut buf) {
                Ok(Some(_)) => u16::from_le_bytes(buf),
                Ok(None) => default_limit_ma,
                Err(e) if e.code() == ESP_ERR_NVS_INVALID_LENGTH => {
                    log::error!("invalid length error when reading \"current-limit\" from nvs. The field length might've changed between updates. Setting it to the default value.");
                    storage.set_raw("current-limit", &default_limit_ma.to_le_bytes())?;
                    restart()
                }
                Err(e) => Err(e)?,
            }
            Err(e) => Err(e)?,
        }
        };
        log::info!("current limit mA: {current_limit_ma}");
        (name_prefix, name, n_leds, current_limit_ma)
    };
    log::info!("current limit mA: {current_limit_ma}");

    // == HTTP client ==
    {


@@ 132,6 142,7 @@ fn main_() -> Result<()> {
    }

    let (color_tx, color_rx) = mpsc::sync_channel::<Srgb<u8>>(COLOR_RING_LEN);
    color_tx.send(Srgb::new(0, 0, 0)).unwrap();
    let color_chan_full = Arc::new(AtomicBool::new(false));
    let color_chan_full1 = color_chan_full.clone();



@@ 141,7 152,7 @@ fn main_() -> Result<()> {
    log::info!("starting display thread");
    std::thread::Builder::new()
        .stack_size(4096)
        .spawn(move || {
        .spawn(move || -> Result<()> {
            let (min_delay, max_delay) = (Duration::from_millis(8), Duration::from_millis(80));
            let (mut too_short_delay, mut too_long_delay) = (min_delay, max_delay);
            let mut delay = (min_delay + max_delay) / 2;


@@ 171,7 182,7 @@ fn main_() -> Result<()> {
                        delay = (too_short_delay + too_long_delay) / 2;
                        too_long_delay = min(max_delay, too_long_delay + delay / 10 + Duration::from_millis(1));
                    }
                    Err(e) => panic!("{}", e),
                    Err(e) => bail!("{}", e),
                }
                sleep(delay);
            }


@@ 194,13 205,12 @@ fn main_() -> Result<()> {
    }

    log::info!("setting up mqtt");
    let storage1 = Arc::new(Mutex::new(storage));
    let name_prefix1 = name_prefix.clone();
    let subscriptions = || {
        let name_prefix1 = name_prefix1.clone();
        let storage3 = storage1.clone();
        let storage2 = storage1.clone();
        let storage1 = storage1.clone();
        let storage3 = storage.clone();
        let storage2 = storage.clone();
        let storage1 = storage.clone();
        let color_tx1 = color_tx.clone();
        let color_tx = color_tx.clone();
        let color_chan_full1 = color_chan_full.clone();


@@ 281,33 291,17 @@ fn main_() -> Result<()> {
            ),
        ]
    };
    while let Err(e) = mqtt::connect(
    let on_connected = || {
        let mut storage = storage.lock().unwrap();
        crate::write_start_attempts(&mut storage, 0).unwrap()
    };
    mqtt::connect(
        &name,
        &format!("mqtt://{}", env!("HUELIA_MQTT_HOST")),
        (name_prefix.as_deref(), n_leds, current_limit_ma),
        subscriptions().into_iter(),
        storage1.clone(),
    ) {
        use mqtt::ConnectErr::*;
        match e {
            InitFailure => log::error!("failed to init the mqtt connection"),
            NoNextEvent => log::error!("mqtt connection event stream empty"),
            Disconnected => log::error!("mqtt disconnected"),
        }
        let mut storage = storage1.lock().unwrap();
        let attempts = read_start_attempts(&mut storage)?;
        let max = 6u8;
        let delay = Duration::from_secs(1) * attempts.max(max) as u32;
        if attempts < max {
            log::info!("fewer startup attempts than {max}. restarting in {delay:?}");
            sleep(delay);
            restart()
        } else {
            log::info!("too many failed restarts (attempts {attempts} > max {max}). will sleep a bit and just retry connecting to mqtt to not keep flashing the white LED annoyingly");
            sleep(Duration::from_secs(5));
        }
    }
    Ok(())
        on_connected,
    )
}

fn color_handler(topic: &str, data: &[u8], tx: &mpsc::SyncSender<Srgb<u8>>, chan_full: &Arc<AtomicBool>) -> Result<()> {


@@ 317,7 311,7 @@ fn color_handler(topic: &str, data: &[u8], tx: &mpsc::SyncSender<Srgb<u8>>, chan
        match tx.try_send(color) {
            Ok(_) => (),
            Err(mpsc::TrySendError::Full(_)) => chan_full.store(true, Relaxed),
            Err(e) => panic!("{}", e),
            Err(e) => bail!("{}", e),
        }
    }
    Ok(())

M performer/src/mqtt.rs => performer/src/mqtt.rs +8 -18
@@ 7,12 7,10 @@ use std::{
    time::Duration,
};

use anyhow::{anyhow, Result};
pub use embedded_svc::mqtt::client::{Connection, Message, QoS};
use embedded_svc::mqtt::client::{Details, Event};
use esp_idf_svc::{
    mqtt::client::{EspMqttClient, LwtConfiguration, MqttClientConfiguration, MqttProtocolVersion},
    nvs::{EspNvs, NvsCustom},
};
use esp_idf_svc::mqtt::client::{EspMqttClient, LwtConfiguration, MqttClientConfiguration, MqttProtocolVersion};

const CLIENT_IN_BUF_SIZE: usize = 4 * 1024;
const CLIENT_OUT_BUF_SIZE: usize = 1024;


@@ 32,19 30,13 @@ pub enum Handler {
    StreamThread(usize, Arc<dyn Fn(&str, StreamReader) -> anyhow::Result<()> + Send + Sync>),
}

pub enum ConnectErr {
    InitFailure,
    NoNextEvent,
    Disconnected,
}

pub fn connect<I>(
    client_name: &str,
    server: &str,
    (prefix, n_leds, current_limit_ma): (Option<&str>, usize, u16),
    subscriptions: I,
    storage: Arc<Mutex<EspNvs<NvsCustom>>>,
) -> Result<(), ConnectErr>
    on_connected: impl Fn(),
) -> Result<()>
where
    I: Iterator<Item = (String, QoS, Handler)>,
{


@@ 74,9 66,9 @@ where
    let mut batch_handlers = BTreeMap::new();
    let mut streams = BTreeMap::new();

    let (mut client, mut events) = EspMqttClient::new_with_conn(server, &mqtt_conf).or(Err(ConnectErr::InitFailure))?;
    let (mut client, mut events) = EspMqttClient::new_with_conn(server, &mqtt_conf)?;
    loop {
        let event = events.next().ok_or(ConnectErr::NoNextEvent)?;
        let event = events.next().ok_or(anyhow!("mqtt connection event stream empty"))?;

        if batch_handlers.len() > MAX_STREAMS {
            batch_handlers.pop_first();


@@ 155,8 147,7 @@ where
                    client.subscribe(&topic, qos).unwrap();
                    log::debug!("sent subscribe request");
                }
                let mut storage = storage.lock().unwrap();
                crate::write_start_attempts(&mut storage, 0).unwrap()
                on_connected()
            }
            Ok(Event::Subscribed(_)) => {
                log::debug!("got subscribe ack");


@@ 167,8 158,7 @@ where
                }
            }
            Ok(Event::Disconnected) => {
                log::warn!("disconnected from mqtt broker");
                return Err(ConnectErr::Disconnected);
                return Err(anyhow!("disconnected from mqtt broker"));
            }
            Ok(event) => log::debug!("unhandled event: {event:?}"),
            Err(e) => log::error!("mqtt client message handler was passed an error: {e}"),