From 30729fe3e6335f9c90de2e8224f1b8467bcb1a75 Mon Sep 17 00:00:00 2001 From: Nathan McCarty Date: Thu, 24 Nov 2022 03:16:57 -0500 Subject: [PATCH] feat!: Remove collector BREAKING CHANGE: Remove unneeded Collector type --- src/util.rs | 2 -- src/util/async_actor.rs | 10 +++--- src/util/collector.rs | 75 ----------------------------------------- src/util/sync_actor.rs | 7 ++-- 4 files changed, 7 insertions(+), 87 deletions(-) delete mode 100644 src/util/collector.rs diff --git a/src/util.rs b/src/util.rs index e605c72..632911a 100644 --- a/src/util.rs +++ b/src/util.rs @@ -1,7 +1,6 @@ //! Utility types and wrappers for working with events mod async_actor; -mod collector; mod consumer_adaptor; pub(crate) mod either_error; mod sync_actor; @@ -10,7 +9,6 @@ mod wrapped_channels; mod wrapped_event; pub use async_actor::{AsyncActor, AsyncActorError}; -pub use collector::{Collector, CollectorOutput}; pub use consumer_adaptor::Adaptor; pub use either_error::EitherError; pub use sync_actor::{SyncActor, SyncActorError}; diff --git a/src/util/async_actor.rs b/src/util/async_actor.rs index 8d4479c..6d7bd2f 100644 --- a/src/util/async_actor.rs +++ b/src/util/async_actor.rs @@ -404,7 +404,7 @@ mod tests { use crate::{ executor::Threads, testing_util::{Add, Math, MathEvent, MathEventType, Output, OutputEvent}, - util::Collector, + traits::ActorExt, }; // Basic smoke test, increment the counter by one a few times async fn smoke() { @@ -476,8 +476,7 @@ mod tests { ) .await; // Hook up our collector - let (collector, collector_out) = Collector::::new(None); - actor.outbox().register_consumer(collector).await.unwrap(); + let collector_out = actor.stream(None).await.unwrap(); // Spawn up some tasks to fill our actor's inbox let _tasks = join_all(events.into_iter().map(|x| { // Same cheeky cloning of the actor @@ -576,8 +575,7 @@ mod tests { events.push(event); } // Hook up our collector - let (collector, collector_out) = Collector::::new(None); - actor.outbox().register_consumer_sync(collector).unwrap(); + let collector_out = actor.stream_sync(None).unwrap(); // Fill our actor's inbox with some threads for x in events { // Some cheeky cloning of the actor @@ -585,7 +583,7 @@ mod tests { std::thread::spawn(move || actor.inbox().accept_sync(x).unwrap()); } // Pull out of our collector and sort the results - let mut collector_out: Vec<_> = collector_out.channel.into_iter().take(100).collect(); + let mut collector_out: Vec<_> = collector_out.into_iter().take(100).collect(); collector_out.sort(); // Pull out of our callback stream and sort the results let mut callbacks: Vec<_> = rx.into_iter().take(100).collect(); diff --git a/src/util/collector.rs b/src/util/collector.rs deleted file mode 100644 index 48d0797..0000000 --- a/src/util/collector.rs +++ /dev/null @@ -1,75 +0,0 @@ -//! Utility type for collecting from an actor into a stream - -use async_trait::async_trait; -use flume::{Receiver, Sender}; -use futures::Stream; -use snafu::Snafu; - -use crate::traits::{Event, EventConsumer}; - -/// Wrapper around a queue implementing [`EventConsumer`], allowing collecting events from an -/// [`Actor`](crate::traits::Actor) into a stream. This is the input side -pub struct Collector { - /// Backing channel - channel: Sender, -} - -// This needs to be explicit to work around limitations in the derive macro -impl Clone for Collector { - fn clone(&self) -> Self { - Self { - channel: self.channel.clone(), - } - } -} - -impl Collector { - /// Create a new `Collector` and its matching output - pub fn new(limit: Option) -> (Self, CollectorOutput) { - let (tx, rx) = match limit { - Some(x) => flume::bounded(x), - None => flume::unbounded(), - }; - (Self { channel: tx }, CollectorOutput { channel: rx }) - } -} - -/// Errors that can happen while interacting with a collector -#[derive(Debug, Snafu)] -#[non_exhaustive] -pub enum CollectorError { - /// Channel disconnected - Disconnected, -} - -#[async_trait] -impl EventConsumer for Collector { - type Error = CollectorError; - async fn accept(&self, event: T) -> Result<(), Self::Error> { - self.channel - .send_async(event) - .await - .map_err(|_| CollectorError::Disconnected) - } - fn accept_sync(&self, event: T) -> Result<(), Self::Error> { - self.channel - .send(event) - .map_err(|_| CollectorError::Disconnected) - } -} - -/// Wrapper around a queue implementing [`EventConsumer`], allowing collecting events from an -/// [`Actor`](crate::traits::Actor) into a stream. This is the output side -pub struct CollectorOutput { - /// Backing channel - /// - /// Only `pub(crate)` for unit testing, do not mess with otherwise - pub(crate) channel: Receiver, -} - -impl CollectorOutput { - /// Consumes this wrapper and returns a [`Stream`] - pub fn into_stream(self) -> impl Stream { - self.channel.into_stream() - } -} diff --git a/src/util/sync_actor.rs b/src/util/sync_actor.rs index 90e4b97..34b621e 100644 --- a/src/util/sync_actor.rs +++ b/src/util/sync_actor.rs @@ -377,7 +377,7 @@ mod tests { use crate::{ executor::Threads, testing_util::{Add, Math, MathEvent, MathEventType, Output, OutputEvent}, - util::Collector, + traits::ActorExt, }; // Basic smoke test, increment the counter by one a few times @@ -438,8 +438,7 @@ mod tests { } println!("Pushed callbacks into actor"); // Hook up our collector - let (collector, collector_out) = Collector::::new(None); - actor.outbox().register_consumer_sync(collector).unwrap(); + let collector_out = actor.stream_sync(None).unwrap(); println!("Hooked up collector"); // Fill our actor's inbox with some threads for x in events { @@ -449,7 +448,7 @@ mod tests { } println!("Spawned actor filling threads"); // Pull out of our collector and sort the results - let mut collector_out: Vec<_> = collector_out.channel.iter().take(100).collect(); + let mut collector_out: Vec<_> = collector_out.iter().take(100).collect(); collector_out.sort(); println!("Pulled out of collector"); // Pull out of our callback stream and sort the results -- 2.45.2