~nickbp/nikau

0e7dbc1d0c3464dcb1739cb439940eae8acb4819 — Nick Parker 6 months ago 54111d4
Drop the last button press from a combo at the server too: Always grab keyboards at the server, while mice/touchpads continue to pass through
M examples/uinput.rs => examples/uinput.rs +7 -8
@@ 10,19 10,19 @@ use tokio::task;
use tracing::{error, info, warn};

use nikau::device::output::{uinput, OutputHandler};
use nikau::device::{util, watch};
use nikau::device::{handles, util, watch, GrabEvent};
use nikau::logging;
use nikau::msgs::event;

struct StubHandler {}

impl watch::DeviceHandler for StubHandler {
impl handles::DeviceHandler for StubHandler {
    fn handle_device_stream(
        &mut self,
        mut stream: evdev::EventStream,
        _grab_rx: broadcast::Receiver<watch::GrabEvent>,
        _grab_rx: Option<broadcast::Receiver<GrabEvent>>,
        _device_info: util::DeviceInfo,
    ) -> Result<watch::DeviceHandle> {
    ) -> Result<handles::DeviceHandle> {
        let handle = tokio::spawn(async move {
            let device_name = stream
                .device()


@@ 40,7 40,7 @@ impl watch::DeviceHandler for StubHandler {
                }
            }
        });
        Ok(watch::DeviceHandle { handle })
        Ok(handles::DeviceHandle { handle })
    }
}



@@ 57,10 57,9 @@ async fn main() -> Result<()> {
    };

    let (grab_tx, _grab_rx) = broadcast::channel(32);
    let handles = handles::DeviceHandles::new(StubHandler {}, grab_tx, HashSet::<Key>::new());
    let handler = task::spawn(async move {
        if let Err(e) =
            watch::watch_loop(StubHandler {}, grab_tx, devices, &HashSet::<Key>::new()).await
        {
        if let Err(e) = watch::watch_loop(handles, devices).await {
            error!("Input device watch failure: {:?}", e);
        }
    });

M src/client.rs => src/client.rs +1 -1
@@ 82,7 82,7 @@ struct Connection {

    active: bool,

    /// Reusable buffer for receiving keyboard events.
    /// Reusable buffer for receiving input events.
    event_bytes: Vec<u8>,
    /// Reusable buffer for receiving bulk data (clipboards).
    bulk_recv_bytes: Vec<u8>,

A src/device/handles.rs => src/device/handles.rs +118 -0
@@ 0,0 1,118 @@
use std::collections::{HashMap, HashSet};
use std::path::{Path, PathBuf};

use anyhow::{Context, Result};
use evdev::{Device, EventStream, Key};
use tokio::sync::broadcast;
use tokio::task;
use tracing::debug;

use crate::device;
use crate::device::util;

pub struct DeviceHandle {
    pub handle: task::JoinHandle<()>,
}

/// Trait for watching the addition and removal of devices from the machine
pub trait DeviceHandler: Send + 'static {
    fn handle_device_stream(
        &mut self,
        events: EventStream,
        grab_rx: Option<broadcast::Receiver<device::GrabEvent>>,
        device_info: util::DeviceInfo,
    ) -> Result<DeviceHandle>;
}

pub struct DeviceHandles<H: DeviceHandler> {
    /// Devices which support one or more keys specified in client switch key combos.
    /// These devices are always grabbed at the server so that we can consistently
    /// grab/"swallow" the key combo input when the local server is the active target.
    always_grabbed_devices: HashMap<PathBuf, DeviceHandle>,

    /// Devices which don't support one or more key combo keys, such as mice.
    /// When the local server is the active target, nikau ungrabs the device and allows
    /// its input to pass through directly.
    toggled_devices: HashMap<PathBuf, DeviceHandle>,

    handler: H,

    /// Method for subscribing devices to grab events
    grab_tx: broadcast::Sender<device::GrabEvent>,

    /// All distinct keys used in client switch key combos, for internal accounting.
    all_combo_keys: HashSet<Key>,
}

impl<H: DeviceHandler> DeviceHandles<H> {
    pub fn new(handler: H, grab_tx: broadcast::Sender<device::GrabEvent>, all_combo_keys: HashSet<Key>) -> DeviceHandles<H> {
        DeviceHandles {
            always_grabbed_devices: HashMap::<PathBuf, DeviceHandle>::new(),
            toggled_devices: HashMap::<PathBuf, DeviceHandle>::new(),
            handler,
            grab_tx,
            all_combo_keys,
        }
    }

    pub(crate) fn add(&mut self, path: &PathBuf, device: Device) -> Result<()> {
        let device_info = util::DeviceInfo::new(&device, false);
        util::log_device_info(&device, &path, &device_info, "Listening to device", true);
        let supports_any_keys = supports_any_keys(&device, &self.all_combo_keys);
        if supports_any_keys {
            debug!("Device supports one or more configured combo keys: {}", device.name().unwrap_or("(Unnamed device)"));
        }
        if supports_any_keys {
            // This device supports one or more keys configured for client switch key combinations.
            // We should grab/route its input via nikau so that we can omit keypresses from the combos.
            let join_handle = self.handler.handle_device_stream(
                start_device_stream(device, path)?,
                None,
                device_info,
            )?;
            self.always_grabbed_devices.insert(path.clone(), join_handle);
        } else {
            // This device doesn't support keys used in key combinations (e.g. a mouse).
            // When the server is the active input, we can ungrab the device,
            // letting its input pass through directly.
            let join_handle = self.handler.handle_device_stream(
                start_device_stream(device, path)?,
                Some(self.grab_tx.subscribe()),
                device_info,
            )?;
            self.toggled_devices.insert(path.clone(), join_handle);
        }
        Ok(())
    }

    pub(crate) fn remove(&mut self, path: &PathBuf) -> Option<DeviceHandle> {
        if let Some(handle) = self.always_grabbed_devices.remove(path) {
            return Some(handle);
        }
        self.toggled_devices.remove(path)
    }

    pub(crate) fn is_empty(&self) -> bool {
        self.always_grabbed_devices.is_empty() && self.toggled_devices.is_empty()
    }
}

fn supports_any_keys(d: &Device, all_combo_keys: &HashSet<Key>) -> bool {
    if let Some(device_keys) = d.supported_keys() {
        for key in all_combo_keys.iter() {
            if device_keys.contains(*key) {
                return true;
            }
        }
    }
    false
}

fn start_device_stream(device: Device, path: &Path) -> Result<EventStream> {
    device.into_event_stream().with_context(|| {
        format!(
            "Failed to initialize async fd for device: {}",
            path.to_string_lossy()
        )
    })
}

M src/device/input.rs => src/device/input.rs +132 -68
@@ 6,8 6,8 @@ use tokio::sync::{broadcast, mpsc};
use tokio::task;
use tracing::{debug, info, trace, warn};

use crate::device::watch::{DeviceHandle, DeviceHandler, GrabEvent};
use crate::device::{shortcut, util, Event};
use crate::device::handles::{DeviceHandle, DeviceHandler};
use crate::device::{shortcut, util, Event, GrabEvent, InputBatch};
use crate::msgs::event;

pub struct InputHandler {


@@ 76,77 76,67 @@ impl DeviceHandler for InputHandler {
    /// Spawns a task for listening to a device's events and for controlling its grab state.
    fn handle_device_stream(
        &mut self,
        mut events: EventStream,
        grab_rx: broadcast::Receiver<GrabEvent>,
        device_info: util::DeviceInfo,
        mut stream: EventStream,
        grab_rx: Option<broadcast::Receiver<GrabEvent>>,
        mut device_info: util::DeviceInfo,
    ) -> Result<DeviceHandle> {
        let config = self.config.clone();
        let handle = task::spawn(async move {
            read_device_events(&mut events, config, grab_rx, device_info).await
        });
        let handle = if let Some(grab_rx) = grab_rx {
            // Device has grab toggling enabled
            task::spawn(async move {
                read_device_or_grab_events(&mut stream, config, grab_rx, device_info).await
            })
        } else {
            // Device is to be permanently grabbed
            handle_grab_event(&mut stream, &mut device_info, GrabEvent::Grab);
            task::spawn(async move {
                read_device_events(&mut stream, config, device_info).await
            })
        };
        Ok(DeviceHandle { handle })
    }
}

async fn read_device_events(
    stream: &mut EventStream,
    mut c: HandlerConfig,
    mut grab_rx: broadcast::Receiver<GrabEvent>,
    mut handler_config: HandlerConfig,
    device_info: util::DeviceInfo,
) {
    let mut input_events_batch = Vec::new();
    let mut combo_events_batch = Vec::new();
    loop {
        match stream.next_event().await {
            Ok(event) => {
                handle_input_event(stream, &mut handler_config, event, &device_info, &mut input_events_batch, &mut combo_events_batch).await
            }
            Err(e) => {
                // Common when the device has been unplugged.
                // We'll frequently get this error just as inotify is telling us the file is deleted.
                // Exit to avoid an infinite loop on trying to read the missing file.
                info!(
                    "Got an error event for {:?}, removing device (might be unplugged?): {}",
                    stream.device().name().unwrap_or("(Unnamed device)"),
                    e
                );
            }
        }
    }
}

async fn read_device_or_grab_events(
    stream: &mut EventStream,
    mut handler_config: HandlerConfig,
    mut grab_rx: broadcast::Receiver<GrabEvent>,
    mut device_info: util::DeviceInfo,
) {
    let mut input_events_batch = Vec::new();
    let mut combo_events_batch = Vec::new();
    loop {
        tokio::select! {
            event = stream.next_event() => {
                match event {
                    Ok(event) => {
                        // 100 limit: Just in case, avoid the risk of collecting queued events forever.
                        //            In practice we should only be collecting 2-3 events between syncs.
                        if event.event_type() == EventType::SYNCHRONIZATION
                            || (input_events_batch.len() + combo_events_batch.len()) >= 100 {
                            // Flush events to be handled by the client as a group
                            if !input_events_batch.is_empty() {
                                if let Err(e) = c.event_tx.send(Event::Input(input_events_batch)).await {
                                    warn!("Error sending input events for routing: {:?}", e);
                                }
                                input_events_batch = Vec::new();
                            }
                            // Follow original events with event(s) from combo completion(s)
                            if !combo_events_batch.is_empty() {
                                for combo_event in combo_events_batch {
                                    if let Err(e) = c.event_tx.send(combo_event).await {
                                        warn!("Error sending combo events for routing: {:?}", e);
                                    }
                                }
                                combo_events_batch = Vec::new();
                            }
                        } else {
                            // Check whether this event completes a key combo, which creates an additional event.
                            // No short-circuit: Ensure that all combo_states have a chance to be updated
                            let mut any_consume = false;
                            for cs in c.combo_states.iter_mut() {
                                match cs.check_combo(&event) {
                                    shortcut::ComboAction::ConsumeEvent => {
                                        any_consume = true;
                                    }
                                    shortcut::ComboAction::PassEvent => {
                                    }
                                    shortcut::ComboAction::ConsumeEventAndEmitAction(action) => {
                                        any_consume = true;
                                        combo_events_batch.push(action);
                                    }
                                    shortcut::ComboAction::PassEventAndEmitAction(action) => {
                                        combo_events_batch.push(action);
                                    }
                                }
                            }
                            if any_consume {
                                debug!("Dropping key event as it's the last key completing one or more combos: {:?}", event);
                            } else {
                                input_events_batch.push(convert_device_event(event, stream.device(), &device_info))
                            }
                        }
                        handle_input_event(stream, &mut handler_config, event, &device_info, &mut input_events_batch, &mut combo_events_batch).await
                    }
                    Err(e) => {
                        // Common when the device has been unplugged.


@@ 157,22 147,14 @@ async fn read_device_events(
                            stream.device().name().unwrap_or("(Unnamed device)"),
                            e
                        );
                        return;
                    }
                }
            },
            }
            grab = grab_rx.recv() => {
                match grab {
                    Ok(GrabEvent::Grab) => {
                        debug!("Grabbing device: {:?}", stream.device().name().unwrap_or("(Unnamed device)"));
                        if let Err(e) = stream.device_mut().grab() {
                            panic!("Failed to grab device {:?}: {:?}", stream.device().name(), e);
                        }
                    }
                    Ok(GrabEvent::Ungrab) => {
                        debug!("Ungrabbing device: {:?}", stream.device().name().unwrap_or("(Unnamed device)"));
                        if let Err(e) = stream.device_mut().ungrab() {
                            panic!("Failed to ungrab device {:?}: {:?}", stream.device().name(), e);
                    Ok(grab) => {
                        if !handle_grab_event(stream, &mut device_info, grab) {
                            return
                        }
                    }
                    Err(e) => {


@@ 190,6 172,88 @@ async fn read_device_events(
    }
}

async fn handle_input_event(
    stream: &mut EventStream,
    c: &mut HandlerConfig,
    event: evdev::InputEvent,
    device_info: &util::DeviceInfo,
    input_events_batch: &mut Vec<event::InputEvent>,
    combo_events_batch: &mut Vec<Event>,
) {
    // 100 limit: Just in case, avoid the risk of collecting queued events forever.
    //            In practice we should only be collecting 2-3 events between syncs.
    if event.event_type() == EventType::SYNCHRONIZATION
        || (input_events_batch.len() + combo_events_batch.len()) >= 100 {
        // Flush events to be handled by the client as a group
        if !input_events_batch.is_empty() {
            let event = Event::Input(InputBatch {
                events: std::mem::replace(input_events_batch, Vec::new()),
                is_grabbed: device_info.is_grabbed,
            });
            if let Err(e) = c.event_tx.send(event).await {
                warn!("Error sending input events for routing: {:?}", e);
            }
        }
        // Follow original events with event(s) from combo completion(s)
        if !combo_events_batch.is_empty() {
            let batch = std::mem::replace(combo_events_batch, Vec::new());
            for combo_event in batch {
                if let Err(e) = c.event_tx.send(combo_event).await {
                    warn!("Error sending combo events for routing: {:?}", e);
                }
            }
        }
    } else {
        // Check whether this event completes a key combo, which creates an additional event.
        // No short-circuit: Ensure that all combo_states have a chance to be updated
        let mut any_consume = false;
        for cs in c.combo_states.iter_mut() {
            match cs.check_combo(&event) {
                shortcut::ComboAction::ConsumeEvent => {
                    any_consume = true;
                }
                shortcut::ComboAction::PassEvent => {
                }
                shortcut::ComboAction::ConsumeEventAndEmitAction(action) => {
                    any_consume = true;
                    combo_events_batch.push(action);
                }
                shortcut::ComboAction::PassEventAndEmitAction(action) => {
                    combo_events_batch.push(action);
                }
            }
        }
        if any_consume {
            debug!("Dropping key event as it's the last key completing one or more combos: {:?}", event);
        } else {
            input_events_batch.push(convert_device_event(event, stream.device(), device_info))
        }
    }
}

fn handle_grab_event(stream: &mut EventStream, device_info: &mut util::DeviceInfo, grab: GrabEvent) -> bool {
    match grab {
        GrabEvent::Grab => {
            debug!("Grabbing device: {:?}", stream.device().name().unwrap_or("(Unnamed device)"));
            if let Err(e) = stream.device_mut().grab() {
                warn!("Failed to grab device {:?}, removing device: {}", stream.device().name(), e);
                return false
            }
            device_info.is_grabbed = true;
            return true
        }
        GrabEvent::Ungrab => {
            debug!("Ungrabbing device: {:?}", stream.device().name().unwrap_or("(Unnamed device)"));
            if let Err(e) = stream.device_mut().ungrab() {
                warn!("Failed to ungrab device {:?}, : {}", stream.device().name(), e);
                return false
            }
            device_info.is_grabbed = false;
            return true
        }
    }
}

fn convert_device_event(
    event: evdev::InputEvent,
    device: &evdev::Device,

M src/device/mod.rs => src/device/mod.rs +14 -1
@@ 1,3 1,4 @@
pub mod handles;
pub mod input;
pub mod output;
pub mod shortcut;


@@ 7,9 8,21 @@ pub mod watch;
use crate::msgs::event;

#[derive(Clone, Debug)]
pub enum GrabEvent {
    Grab,
    Ungrab,
}

#[derive(Clone, Debug)]
pub struct InputBatch {
    pub events: Vec<event::InputEvent>,
    pub is_grabbed: bool,
}

#[derive(Clone, Debug)]
pub enum Event {
    /// A group of input events to send to the active client, if any
    Input(Vec<event::InputEvent>),
    Input(InputBatch),
    /// Activate the next client (or the server) in the rotation
    SwitchNext,
    /// Activate the previous client (or the server) in the rotation

M src/device/util.rs => src/device/util.rs +21 -14
@@ 67,29 67,36 @@ pub fn axis_scale_type(axis: AbsoluteAxisType) -> AxisScale {

pub struct DeviceInfo {
    pub dims: BTreeMap<u16, (i32, i32)>,
    pub is_grabbed: bool,
}

pub fn log_device_info(device: &Device, path: &Path, log_prefix: &str, info: bool) -> DeviceInfo {
    let mut dims = BTreeMap::new();
    // For each abs axis supported by the device, record its max and min
    // Result will be something like ABS_X(0,100), ABS_Y(0,70), ABS_MT_POSITION_X(0,100) ...
    if let Some(abs_axes) = device.supported_absolute_axes() {
        if let Ok(state) = device.get_abs_state() {
            // clippy recommends this ugly way to get a loop counter
            for (i, s) in (0_u16..).zip(state.into_iter()) {
                let type_ = AbsoluteAxisType::from_index(i as usize);
                if abs_axes.contains(type_) && axis_scale_type(type_) != AxisScale::Invalid {
                    dims.insert(i, (s.minimum, s.maximum));
impl DeviceInfo {
    pub fn new(device: &Device, is_grabbed: bool) -> DeviceInfo {
        let mut dims = BTreeMap::new();
        // For each abs axis supported by the device, record its max and min
        // Result will be something like ABS_X(0,100), ABS_Y(0,70), ABS_MT_POSITION_X(0,100) ...
        if let Some(abs_axes) = device.supported_absolute_axes() {
            if let Ok(state) = device.get_abs_state() {
                // clippy recommends this ugly way to get a loop counter
                for (i, s) in (0_u16..).zip(state.into_iter()) {
                    let type_ = AbsoluteAxisType::from_index(i as usize);
                    if abs_axes.contains(type_) && axis_scale_type(type_) != AxisScale::Invalid {
                        dims.insert(i, (s.minimum, s.maximum));
                    }
                }
            }
        }
        DeviceInfo { dims, is_grabbed }
    }
}

pub fn log_device_info(device: &Device, path: &Path, device_info: &DeviceInfo, log_prefix: &str, info: bool) {
    // under info, show device name/path only
    let msg = format!(
        "{}: {} @ {}",
        log_prefix,
        device.name().unwrap_or("(Unnamed device)"),
        path.display()
        path.display(),
    );
    if info {
        info!("{}", msg);


@@ 97,12 104,12 @@ pub fn log_device_info(device: &Device, path: &Path, log_prefix: &str, info: boo
        debug!("{}", msg);
    }
    // under debug, show nikau version of device details
    debug!("Nikau device details:{}", device_info_string(device, &dims));
    debug!("Nikau device details:{}", device_info_string(device, &device_info.dims));
    // under trace, show evdev version of things too, but note that the abs values are missing:
    trace!("Evdev device details:\n{}", device);
    DeviceInfo { dims }
}

/// Summarizes an evdev InputEvent, hiding the key being pressed in the case of a key event.
pub fn log_event(event: &InputEvent) -> String {
    let kind = match event.kind() {
        InputEventKind::Key(_key) => {

M src/device/watch.rs => src/device/watch.rs +32 -122
@@ 1,15 1,13 @@
use std::collections::{HashMap, HashSet};
use std::path::{Path, PathBuf};

use anyhow::{bail, Context, Result};
use evdev::{Device, EventStream, EventType, Key};
use evdev::{Device, EventType, Key};
use notify::Watcher;
use regex::Regex;
use tokio::sync::{broadcast, mpsc};
use tokio::task;
use tokio::sync::mpsc;
use tracing::{debug, info, trace, warn};

use crate::device::{output, util};
use crate::device::{handles, output, util};

#[derive(Debug)]
enum DeviceEventKind {


@@ 23,31 21,9 @@ struct DeviceEvent {
    pub path: PathBuf,
}

#[derive(Clone, Debug)]
pub enum GrabEvent {
    Grab,
    Ungrab,
}

pub struct DeviceHandle {
    pub handle: task::JoinHandle<()>,
}

/// Trait for watching the addition and removal of devices from the machine
pub trait DeviceHandler: Send + 'static {
    fn handle_device_stream(
        &mut self,
        events: EventStream,
        grab_rx: broadcast::Receiver<GrabEvent>,
        device_info: util::DeviceInfo,
    ) -> Result<DeviceHandle>;
}

pub async fn watch_loop<F: DeviceHandler>(
    mut handler: F,
    mut grab_tx: broadcast::Sender<GrabEvent>,
pub async fn watch_loop<H: handles::DeviceHandler>(
    mut device_handles: handles::DeviceHandles<H>,
    device_filters: Vec<Regex>,
    all_combo_keys: &HashSet<Key>,
) -> Result<()> {
    // Start watch for new and removed devices BEFORE scanning current devices.
    let (device_event_tx, mut device_event_rx): (


@@ 72,46 48,25 @@ pub async fn watch_loop<F: DeviceHandler>(
    )?;

    // Scan current devices
    let mut devices = HashMap::new();
    for (path, device) in evdev::enumerate() {
        // enumerate() already filters for 'event*' filenames
        if !compatible_device(&device, &path) {
        let device_info = util::DeviceInfo::new(&device, false);
        if !compatible_device(&device, &path, &device_info) {
            continue;
        }
        if !matches_filters(&device_filters, &device, &path) {
        if !matches_filters(&device_filters, &device, &path, &device_info) {
            continue;
        }
        if supports_any_keys(&device, all_combo_keys) {
            // TODO the device (probably a keyboard) should always be grabbed,
            // its events should then be routed to a virtual device set when the server is active
            // (not just virtual keyboard, in case the "keyboard" has non-keyboard event types)
            debug!(
                "Device supports one or more configured combo keys: {:?}",
                device.name()
            );
        }
        let device_info = util::log_device_info(&device, &path, "Listening to device", true);
        let events = start_device_stream(device, &path)?;
        devices.insert(
            path,
            handler.handle_device_stream(events, grab_tx.subscribe(), device_info)?,
        );
        device_handles.add(&path, device)?;
    }
    if devices.is_empty() {
    if device_handles.is_empty() {
        bail!("Didn't find any compatible input devices to listen to.");
    }

    // Start handler to consume new/removed device events
    loop {
        if let Some(event) = device_event_rx.recv().await {
            handle_device_event(
                &mut handler,
                &mut devices,
                &mut grab_tx,
                &device_filters,
                event,
            )
            .await;
            handle_device_event(&mut device_handles, &device_filters, event).await;
        } else {
            // Channel lost, exit
            return Ok(());


@@ 119,10 74,8 @@ pub async fn watch_loop<F: DeviceHandler>(
    }
}

async fn handle_device_event<F: DeviceHandler>(
    handler: &mut F,
    devices: &mut HashMap<PathBuf, DeviceHandle>,
    grab_tx: &mut broadcast::Sender<GrabEvent>,
async fn handle_device_event<H: handles::DeviceHandler>(
    device_handles: &mut handles::DeviceHandles<H>,
    device_filters: &Vec<Regex>,
    event: DeviceEvent,
) {


@@ 134,45 87,20 @@ async fn handle_device_event<F: DeviceHandler>(
            }
            match Device::open(&event.path) {
                Ok(device) => {
                    if !compatible_device(&device, &event.path) {
                    let device_info = util::DeviceInfo::new(&device, false);
                    if !compatible_device(&device, &event.path, &device_info) {
                        return;
                    }
                    if !matches_filters(device_filters, &device, &event.path) {
                    if !matches_filters(device_filters, &device, &event.path, &device_info) {
                        return;
                    }
                    let device_info = util::log_device_info(
                        &device,
                        &event.path,
                        "Listening to new device",
                        true,
                    );
                    match start_device_stream(device, &event.path) {
                        Ok(stream) => {
                            match handler.handle_device_stream(
                                stream,
                                grab_tx.subscribe(),
                                device_info,
                            ) {
                                Ok(join_handle) => {
                                    devices.insert(event.path, join_handle);
                                }
                                Err(e) => {
                                    warn!(
                                        "Failed to start event handler for device {}: {}",
                                        event.path.to_string_lossy(),
                                        e
                                    );
                                }
                            }
                        }
                        Err(e) => {
                            // Avoid exiting loop and aborting program if a new device fails
                            warn!(
                                "Failed to read device {}: {}",
                                event.path.to_string_lossy(),
                                e
                            );
                        }
                    // Avoid exiting loop and aborting program if a newly added device fails
                    if let Err(e) = device_handles.add(&event.path, device) {
                        warn!(
                            "Failed to set up new device {}: {}",
                            event.path.to_string_lossy(),
                            e
                        );
                    }
                }
                Err(e) => {


@@ 186,7 114,7 @@ async fn handle_device_event<F: DeviceHandler>(
            };
        }
        DeviceEventKind::Deleted => {
            if let Some(device_handle) = devices.remove(&event.path) {
            if let Some(device_handle) = device_handles.remove(&event.path) {
                info!("Removing device: {}", event.path.to_string_lossy());
                device_handle.handle.abort();
            }


@@ 206,7 134,7 @@ fn compatible_path(path: &Path) -> bool {
    is_match
}

fn compatible_device(d: &Device, path: &Path) -> bool {
fn compatible_device(d: &Device, path: &Path, device_info: &util::DeviceInfo) -> bool {
    // Avoid a situation where we're consuming our own virtual output device, risking an infinite loop.
    // This could happen if client and server are running on the same machine (e.g. for testing)
    if let Some(name) = d.name() {


@@ 236,7 164,7 @@ fn compatible_device(d: &Device, path: &Path) -> bool {
                .all(|key| key == Key::KEY_POWER || key == Key::KEY_SLEEP || key == Key::KEY_WAKEUP)
        } else {
            // Key device without any keys? Skip it
            util::log_device_info(d, path, "Ignoring KEY device lacking supported keys", false);
            util::log_device_info(d, path, device_info, "Ignoring KEY device lacking supported keys", false);
            false
        }
    } else {


@@ 244,6 172,7 @@ fn compatible_device(d: &Device, path: &Path) -> bool {
        util::log_device_info(
            d,
            path,
            device_info,
            "Ignoring device that isn't ABSOLUTE or RELATIVE or KEY",
            false,
        );


@@ 251,8 180,8 @@ fn compatible_device(d: &Device, path: &Path) -> bool {
    }
}

fn matches_filters(name_filters: &Vec<Regex>, d: &Device, path: &Path) -> bool {
    let device_name = d.name().unwrap_or("(Unnamed device)");
fn matches_filters(name_filters: &Vec<Regex>, device: &Device, path: &Path, device_info: &util::DeviceInfo) -> bool {
    let device_name = device.name().unwrap_or("(Unnamed device)");
    if name_filters.is_empty() {
        return true;
    }


@@ 263,8 192,9 @@ fn matches_filters(name_filters: &Vec<Regex>, d: &Device, path: &Path) -> bool {
    let is_match = !matches.is_empty();
    if !is_match {
        util::log_device_info(
            d,
            path,
            &device,
            &path,
            device_info,
            "Ignoring device that doesn't match --device name filters",
            true,
        );


@@ 272,26 202,6 @@ fn matches_filters(name_filters: &Vec<Regex>, d: &Device, path: &Path) -> bool {
    is_match
}

fn supports_any_keys(d: &Device, all_combo_keys: &HashSet<Key>) -> bool {
    if let Some(device_keys) = d.supported_keys() {
        for key in all_combo_keys.iter() {
            if device_keys.contains(*key) {
                return true;
            }
        }
    }
    false
}

fn start_device_stream(device: Device, path: &Path) -> Result<EventStream> {
    device.into_event_stream().with_context(|| {
        format!(
            "Failed to initialize async fd for device: {}",
            path.to_string_lossy()
        )
    })
}

async fn send_device_events(event: notify::Event, device_event_tx: &mpsc::Sender<DeviceEvent>) {
    match event.kind {
        notify::EventKind::Create(notify::event::CreateKind::File) => {

M src/main.rs => src/main.rs +11 -5
@@ 12,7 12,7 @@ use tokio::sync::{broadcast, mpsc};
use tokio::{runtime, task, time};
use tracing::{error, info, warn};

use nikau::device::{input, output, shortcut, watch, Event};
use nikau::device::{handles, input, output, shortcut, watch, Event};
use nikau::network::approval;
use nikau::{client, logging, server};



@@ 206,12 206,18 @@ async fn server(
    keys_next: &str,
    keys_prev: Option<&str>,
    keys_goto: Vec<String>,
    devices: Vec<Regex>,
    device_filters: Vec<Regex>,
    exit_secs: Option<u32>,
    verifier: Arc<approval::NikauCertVerification>,
    fingerprint: Arc<Mutex<Option<String>>>,
    max_clipboard_size_bytes: u64,
) -> Result<()> {
    // Try to set up virtual devices up-front - exit early if we aren't root
    let output_handler = output::uinput::VirtualUInputDevices::new()
        .context("Failed to create virtual devices for output, possible solutions:
- The server may need to be run as root with 'sudo -E nikau server ...' to allow creating virtual devices.
- Enable uinput and/or evdev in the kernel, check for /dev/uinput and /dev/input/")?;

    let (event_tx, event_rx): (mpsc::Sender<Event>, mpsc::Receiver<Event>) = mpsc::channel(32);

    let event_tx2 = event_tx.clone();


@@ 225,13 231,12 @@ async fn server(
    let input_handler = input::InputHandler::new(&key_combos, event_tx)?;

    let watch_handle = task::spawn(async move {
        watch::watch_loop(input_handler, grab_tx, devices, &key_combos.all_keys)
        let device_handles = handles::DeviceHandles::new(input_handler, grab_tx, key_combos.all_keys);
        watch::watch_loop(device_handles, device_filters)
            .await
            .context(
                "Failed to listen to any input devices, possible solutions:
- Are any input devices (keyboard, mouse, etc) plugged into the machine?
- The server may need to be run as root with 'sudo -E nikau server ...' to allow listening to input.
- Enable uinput and/or evdev in the kernel, check for /dev/uinput and /dev/input/
- If any '--device' filters are specified, they might be filtering out all current devices",
            )
    });


@@ 244,6 249,7 @@ async fn server(
            event_rx,
            fingerprint,
            grab_tx2,
            output_handler,
            // Max compressed clipboard size over the wire
            max_clipboard_size_bytes,
            // Max uncompressed clipboard size, just in case

M src/rotation.rs => src/rotation.rs +33 -13
@@ 9,7 9,7 @@ use tokio::sync::{broadcast, mpsc, oneshot, watch as watchchan};
use tokio::task;
use tracing::{debug, error, info, trace, warn};

use crate::device::watch;
use crate::device;
use crate::msgs::{bulk, event};
use crate::x11clipboard::{
    reader::{ClipboardReader, ClipboardTypeWatcher},


@@ 209,8 209,9 @@ pub struct ClipboardSendContentArgs {
    pub data: ClipboardData,
}

pub struct Rotation {
    grab_tx: broadcast::Sender<watch::GrabEvent>,
pub struct Rotation<O: device::output::OutputHandler> {
    grab_tx: broadcast::Sender<device::GrabEvent>,
    output_handler: O,
    clients: Vec<ClientInfo>,
    /// Use the endpoint, not the fingerprint, to uniquely identify clients.
    /// This allows situations like a client reconnecting before the old socket has closed.


@@ 223,15 224,17 @@ pub struct Rotation {
    clipboard_target: Option<ClipboardTarget>,
}

impl Rotation {
impl<O: device::output::OutputHandler> Rotation<O> {
    pub async fn new(
        grab_tx: broadcast::Sender<watch::GrabEvent>,
        grab_tx: broadcast::Sender<device::GrabEvent>,
        output_handler: O,
        local_clipboard: Option<LocalClipboard>,
    ) -> Result<Self> {
        // Init required for space to be usable
        let buf = vec![0; 1024];
        Ok(Rotation {
            grab_tx,
            output_handler,
            clients: Vec::new(),
            current_client: None,
            removed_current_client: None,


@@ 707,7 710,7 @@ impl Rotation {
            // Try to send switch{true} to the newly assigned current_client.
            // If it fails then current_client is cleaned up.
            if let Ok(()) = self
                .send_event_current(event::ServerEvent::Switch(event::SwitchEvent {
                .send_event_to_remote_client(event::ServerEvent::Switch(event::SwitchEvent {
                    enabled: true,
                }))
                .await


@@ 783,7 786,7 @@ impl Rotation {
                        "Sending clipboard types for {} to {}: {}",
                        clipboard_source, current_client, types_str
                    );
                    self.send_event_current(types_msg).await?;
                    self.send_event_to_remote_client(types_msg).await?;
                }
            } else if let Some(local_clipboard) = &mut self.local_clipboard {
                // The server is active and its clipboard support is enabled.


@@ 810,7 813,7 @@ impl Rotation {
                    "Sending clipboard types for server to {}: {}",
                    current_client, types_str
                );
                self.send_event_current(types_msg).await?;
                self.send_event_to_remote_client(types_msg).await?;
            }
        }
        Ok(())


@@ 846,12 849,11 @@ impl Rotation {

    /// Sends an event to the currently active client, removing it if sending fails.
    /// If no client is active, this does nothing.
    pub async fn send_event_current(&mut self, msg: event::ServerEvent<'_>) -> Result<()> {
    async fn send_event_to_remote_client(&mut self, msg: event::ServerEvent<'_>) -> Result<()> {
        let current_client = match self.current_client {
            Some(client) => client,
            None => {
                // Ignore input when using the local machine.
                // We continue reading the input to detect combo presses but that's it.
                // On local machine, nothing to do
                return Ok(());
            }
        };


@@ 864,6 866,24 @@ impl Rotation {
        Ok(())
    }

    /// Handles an input event collected from the server.
    pub async fn send_input_events(&mut self, batch: device::InputBatch) -> Result<()> {
        if let Some(_) = self.current_client {
            // Remote client is active, send all input to client and not to local machine.
            self.send_event_to_remote_client(event::ServerEvent::Input(batch.events)).await
        } else if batch.is_grabbed {
            // Local machine is active and device is grabbed, write input to local virtual devices.
            // For example, we grab keyboards so that we can skip sending switch combos to the local system.
            self.output_handler.write(batch.events).await
        } else {
            // Local machine is active and device isn't grabbed (passthrough), drop input event.
            // For example, we don't grab mice/touchpads since they aren't relevant to switch combos.
            // If we send their input to the handler, the input is duplicated between the passthrough
            // and the virtual device.
            Ok(())
        }
    }

    /// Sends an event to the specified client, removing it if sending fails.
    /// If the client isn't found, returns Ok(false)
    /// If sending the message fails, removes the client and returns Err


@@ 1010,9 1030,9 @@ impl Rotation {
    async fn set_and_grab_current_client(&mut self, client: Option<SocketAddr>) {
        self.current_client = client;
        let grab = if client.is_some() {
            watch::GrabEvent::Grab
            device::GrabEvent::Grab
        } else {
            watch::GrabEvent::Ungrab
            device::GrabEvent::Ungrab
        };
        if let Err(e) = self.grab_tx.send(grab) {
            // Avoid leaving devices in a bad grabbed state

M src/server.rs => src/server.rs +10 -9
@@ 7,18 7,19 @@ use tokio::sync::{broadcast, mpsc};
use tokio::task;
use tracing::{debug, error, info, trace, warn};

use crate::device::{watch, Event};
use crate::device::{output, Event, GrabEvent};
use crate::msgs::{bulk, event};
use crate::network::{approval, transport};
use crate::{rotation, x11clipboard};

pub async fn run_server(
pub async fn run_server<O: output::OutputHandler>(
    listen_addr: &SocketAddr,
    cert_verifier: Arc<approval::NikauCertVerification>,
    config_dir: PathBuf,
    mut input_rx: mpsc::Receiver<Event>,
    mut event_rx: mpsc::Receiver<Event>,
    fingerprint: Arc<Mutex<Option<String>>>,
    grab_tx: broadcast::Sender<watch::GrabEvent>,
    grab_tx: broadcast::Sender<GrabEvent>,
    output_handler: O,
    max_clipboard_size_bytes: u64,
    max_uncompressed_size_bytes: u64,
) -> Result<()> {


@@ 38,7 39,7 @@ pub async fn run_server(
        }
    };

    let mut rotation = rotation::Rotation::new(grab_tx, local_clipboard).await?;
    let mut rotation = rotation::Rotation::new(grab_tx, output_handler, local_clipboard).await?;
    let server_endpoint = transport::build_server(listen_addr, cert_verifier)?;

    loop {


@@ 52,14 53,14 @@ pub async fn run_server(
                rotation.accept(event).await;
            },
            // Listen to local system device input events
            event = input_rx.recv() => {
            event = event_rx.recv() => {
                let event = match event {
                    Some(e) => e,
                    None => bail!("input_rx is closed, exiting server"),
                    None => bail!("event_rx is closed, exiting server"),
                };
                match event {
                    Event::Input(events) => {
                        if let Err(e) = rotation.send_event_current(event::ServerEvent::Input(events)).await {
                    Event::Input(batch) => {
                        if let Err(e) = rotation.send_input_events(batch).await {
                            warn!("Failed to send input events to current client: {:?}", e);
                        }
                    }