~thatonelutenist/actm

30729fe3e6335f9c90de2e8224f1b8467bcb1a75 — Nathan McCarty 1 year, 9 months ago 575fb50
feat!: Remove collector

BREAKING CHANGE: Remove unneeded Collector type
4 files changed, 7 insertions(+), 87 deletions(-)

M src/util.rs
M src/util/async_actor.rs
D src/util/collector.rs
M src/util/sync_actor.rs
M src/util.rs => src/util.rs +0 -2
@@ 1,7 1,6 @@
//! Utility types and wrappers for working with events

mod async_actor;
mod collector;
mod consumer_adaptor;
pub(crate) mod either_error;
mod sync_actor;


@@ 10,7 9,6 @@ mod wrapped_channels;
mod wrapped_event;

pub use async_actor::{AsyncActor, AsyncActorError};
pub use collector::{Collector, CollectorOutput};
pub use consumer_adaptor::Adaptor;
pub use either_error::EitherError;
pub use sync_actor::{SyncActor, SyncActorError};

M src/util/async_actor.rs => src/util/async_actor.rs +4 -6
@@ 404,7 404,7 @@ mod tests {
    use crate::{
        executor::Threads,
        testing_util::{Add, Math, MathEvent, MathEventType, Output, OutputEvent},
        util::Collector,
        traits::ActorExt,
    };
    // Basic smoke test, increment the counter by one a few times
    async fn smoke<X: Executor>() {


@@ 476,8 476,7 @@ mod tests {
        )
        .await;
        // Hook up our collector
        let (collector, collector_out) = Collector::<OutputEvent>::new(None);
        actor.outbox().register_consumer(collector).await.unwrap();
        let collector_out = actor.stream(None).await.unwrap();
        // Spawn up some tasks to fill our actor's inbox
        let _tasks = join_all(events.into_iter().map(|x| {
            // Same cheeky cloning of the actor


@@ 576,8 575,7 @@ mod tests {
            events.push(event);
        }
        // Hook up our collector
        let (collector, collector_out) = Collector::<OutputEvent>::new(None);
        actor.outbox().register_consumer_sync(collector).unwrap();
        let collector_out = actor.stream_sync(None).unwrap();
        // Fill our actor's inbox with some threads
        for x in events {
            // Some cheeky cloning of the actor


@@ 585,7 583,7 @@ mod tests {
            std::thread::spawn(move || actor.inbox().accept_sync(x).unwrap());
        }
        // Pull out of our collector and sort the results
        let mut collector_out: Vec<_> = collector_out.channel.into_iter().take(100).collect();
        let mut collector_out: Vec<_> = collector_out.into_iter().take(100).collect();
        collector_out.sort();
        // Pull out of our callback stream and sort the results
        let mut callbacks: Vec<_> = rx.into_iter().take(100).collect();

D src/util/collector.rs => src/util/collector.rs +0 -75
@@ 1,75 0,0 @@
//! Utility type for collecting from an actor into a stream

use async_trait::async_trait;
use flume::{Receiver, Sender};
use futures::Stream;
use snafu::Snafu;

use crate::traits::{Event, EventConsumer};

/// Wrapper around a queue implementing [`EventConsumer`], allowing collecting events from an
/// [`Actor`](crate::traits::Actor) into a stream. This is the input side
pub struct Collector<T> {
    /// Backing channel
    channel: Sender<T>,
}

// This needs to be explicit to work around limitations in the derive macro
impl<T> Clone for Collector<T> {
    fn clone(&self) -> Self {
        Self {
            channel: self.channel.clone(),
        }
    }
}

impl<T> Collector<T> {
    /// Create a new `Collector` and its matching output
    pub fn new(limit: Option<usize>) -> (Self, CollectorOutput<T>) {
        let (tx, rx) = match limit {
            Some(x) => flume::bounded(x),
            None => flume::unbounded(),
        };
        (Self { channel: tx }, CollectorOutput { channel: rx })
    }
}

/// Errors that can happen while interacting with a collector
#[derive(Debug, Snafu)]
#[non_exhaustive]
pub enum CollectorError {
    /// Channel disconnected
    Disconnected,
}

#[async_trait]
impl<T: Event> EventConsumer<T> for Collector<T> {
    type Error = CollectorError;
    async fn accept(&self, event: T) -> Result<(), Self::Error> {
        self.channel
            .send_async(event)
            .await
            .map_err(|_| CollectorError::Disconnected)
    }
    fn accept_sync(&self, event: T) -> Result<(), Self::Error> {
        self.channel
            .send(event)
            .map_err(|_| CollectorError::Disconnected)
    }
}

/// Wrapper around a queue implementing [`EventConsumer`], allowing collecting events from an
/// [`Actor`](crate::traits::Actor) into a stream. This is the output side
pub struct CollectorOutput<T> {
    /// Backing channel
    ///
    /// Only `pub(crate)` for unit testing, do not mess with otherwise
    pub(crate) channel: Receiver<T>,
}

impl<T: 'static> CollectorOutput<T> {
    /// Consumes this wrapper and returns a [`Stream`]
    pub fn into_stream(self) -> impl Stream<Item = T> {
        self.channel.into_stream()
    }
}

M src/util/sync_actor.rs => src/util/sync_actor.rs +3 -4
@@ 377,7 377,7 @@ mod tests {
    use crate::{
        executor::Threads,
        testing_util::{Add, Math, MathEvent, MathEventType, Output, OutputEvent},
        util::Collector,
        traits::ActorExt,
    };

    // Basic smoke test, increment the counter by one a few times


@@ 438,8 438,7 @@ mod tests {
        }
        println!("Pushed callbacks into actor");
        // Hook up our collector
        let (collector, collector_out) = Collector::<OutputEvent>::new(None);
        actor.outbox().register_consumer_sync(collector).unwrap();
        let collector_out = actor.stream_sync(None).unwrap();
        println!("Hooked up collector");
        // Fill our actor's inbox with some threads
        for x in events {


@@ 449,7 448,7 @@ mod tests {
        }
        println!("Spawned actor filling threads");
        // Pull out of our collector and sort the results
        let mut collector_out: Vec<_> = collector_out.channel.iter().take(100).collect();
        let mut collector_out: Vec<_> = collector_out.iter().take(100).collect();
        collector_out.sort();
        println!("Pulled out of collector");
        // Pull out of our callback stream and sort the results