M src/testing_util.rs => src/testing_util.rs +8 -2
@@ 8,7 8,7 @@ use crate::{
executor::Executor,
sync_actor,
traits::{Actor, Event, EventConsumer, EventProducer},
- types::CompletionToken,
+ types::{CompletionToken, Waiter},
util::{AsyncActor, AsyncActorError, SyncActor, SyncActorError, WrappedEvent},
wrapped_event,
};
@@ 145,6 145,8 @@ mod tests {
async fn smoke() {
let actor = AsyncMathActor::<AsyncStd>::new(0, Some(1));
let output = actor.stream(None).await.unwrap();
+ // Wait for the subscribe to happen before feeding in events
+ assert!(actor.catchup().wait().await);
// Add some numbers to our internal count
for i in 1..=10 {
let mut event: MathEvent = MathEventType::Add(Add(i)).into();
@@ 178,6 180,8 @@ mod tests {
.collect::<Vec<_>>(),
output_events
);
+ // Make sure the actor shuts down
+ assert!(actor.shutdown().wait().await);
}
}
@@ 190,7 194,7 @@ mod tests {
let actor = SyncMathActor::<Threads>::new(0, Some(1));
let output = actor.stream_sync(None).unwrap();
// Sleep for a bit to let the subscription take
- std::thread::sleep(std::time::Duration::from_millis(50));
+ assert!(actor.catchup().wait_sync());
println!("Started up actor");
// Add some numbers to our internal count
for i in 1..=10 {
@@ 227,6 231,8 @@ mod tests {
.collect::<Vec<_>>(),
output_events
);
+ // Make sure the actor shuts down properly
+ assert!(actor.shutdown().wait_sync());
}
}
}
M src/traits.rs => src/traits.rs +9 -0
@@ 10,6 10,7 @@ use tracing::{error, instrument, warn};
use super::types::CompletionToken;
use crate::{
executor::Executor,
+ types::Waiter,
util::either_error::{EitherError, LeftSnafu, RightSnafu},
};
@@ 204,6 205,14 @@ pub trait Actor<I: Event, O: Event, E: Executor>: Send + Sync + 'static {
/// Returns a new handle to the associated `Outbox` for this `Actor`, this should be owned so it
/// can be easily passed around between threads
fn outbox(&self) -> &Self::Outbox;
+
+ /// Shutdown the `Actor`, waiting for all currently in flight events to be processed, and return
+ /// a [`Waiter`] that will be signaled when the `Actor` is shutdown
+ fn shutdown(&self) -> Waiter;
+
+ /// Process all currently in flight events in a batch, and return a [`Waiter`] that will be
+ /// signaled when the `Actor` is caught up
+ fn catchup(&self) -> Waiter;
}
/// Extension trait providing utility methods for [`Actor`]
M src/util/async_actor.rs => src/util/async_actor.rs +117 -8
@@ 17,7 17,7 @@ use super::TokenManager;
use crate::{
executor::Executor,
traits::{Actor, Event, EventConsumer, EventProducer},
- types::{CompletionToken, DynamicConsumer, DynamicError},
+ types::{CompletionToken, DynamicConsumer, DynamicError, Trigger, Waiter},
};
mod newtype_macro;
@@ 39,6 39,10 @@ pub struct AsyncActor<I: Event, O: Event, X: Executor> {
/// A channel to register callbacks over
#[allow(clippy::type_complexity)] // This type isn't actually that complex
callbacks: Sender<(CompletionToken, Box<dyn FnOnce(O) + Send + Sync + 'static>)>,
+ /// A channel for shutdown triggers
+ shutdown: Sender<Trigger>,
+ /// A channel for catch up triggers
+ catchup: Sender<Trigger>,
/// Phantom for the executor type
_executor: PhantomData<X>,
}
@@ 51,6 55,8 @@ impl<X: Executor, I: Event, O: Event> Clone for AsyncActor<I, O, X> {
inbox: self.inbox.clone(),
consumers: self.consumers.clone(),
callbacks: self.callbacks.clone(),
+ shutdown: self.shutdown.clone(),
+ catchup: self.catchup.clone(),
_executor: PhantomData,
}
}
@@ 121,6 127,8 @@ impl<X: Executor, I: Event, O: Event> AsyncActor<I, O, X> {
Some(x) => flume::bounded(x),
None => flume::unbounded(),
};
+ let (shutdown_tx, shutdown_rx) = flume::bounded::<Trigger>(1);
+ let (catchup_tx, catchup_rx) = flume::bounded::<Trigger>(1);
// Get a task id
let id = COUNTER.fetch_add(1, Ordering::SeqCst);
info!(?id, "Spawning BasicActor task");
@@ 131,13 139,16 @@ impl<X: Executor, I: Event, O: Event> AsyncActor<I, O, X> {
// We will use this intercept all outbound
let mut token_manager: TokenManager<O> = TokenManager::new();
// Convert channels into streams
- let mut inbox = inbox_rx.into_stream();
- let mut consumers_inbox = consumers_rx.into_stream();
- let mut callbacks = callbacks_rx.into_stream();
+ let mut inbox_stream = inbox_rx.clone().into_stream();
+ let mut consumers_inbox_stream = consumers_rx.clone().into_stream();
+ let mut callbacks_stream = callbacks_rx.clone().into_stream();
+ let mut shutdown_stream = shutdown_rx.into_stream();
+ let mut catchup_stream = catchup_rx.into_stream();
// Store our consumers
let mut consumers = Vec::<Box<dyn EventConsumer<O, Error = DynamicError>>>::new();
// Enter our event handling loop
loop {
+ // Check for a shutdown signal
// Select over our three inboxes. In each branch, we will log an error and break
// out of the loop, implicitly closing down the task, if the other side of the
// stream has been closed
@@ 146,7 157,7 @@ impl<X: Executor, I: Event, O: Event> AsyncActor<I, O, X> {
// ""semi-random"" way, this should give roughly equal attention to all the
// inputs under load
select! {
- x = inbox.next() => {
+ x = inbox_stream.next() => {
if let Some(x) = x {
// Process the event with the caller provided closure and context,
// and then perform bookkeeping if the closure returns an outbound
@@ 158,7 169,11 @@ impl<X: Executor, I: Event, O: Event> AsyncActor<I, O, X> {
// callbacks can be called back
let output = token_manager.process(output);
// Then distribute it to all of our consumers
- let results = join_all(consumers.iter_mut().map(|x| async { x.accept(output.stateless_clone()).await})).await;
+ let results = join_all(
+ consumers
+ .iter_mut()
+ .map(|x| async { x.accept(output.stateless_clone()).await})
+ ).await;
// Log all of our errors
for result in results {
if let Err(e) = result {
@@ 171,7 186,7 @@ impl<X: Executor, I: Event, O: Event> AsyncActor<I, O, X> {
break;
}
}
- x = consumers_inbox.next() => {
+ x = consumers_inbox_stream.next() => {
if let Some(x) = x {
// Add the new consumer to the list
consumers.push(x);
@@ 180,7 195,7 @@ impl<X: Executor, I: Event, O: Event> AsyncActor<I, O, X> {
break;
}
}
- x = callbacks.next() => {
+ x = callbacks_stream.next() => {
if let Some(x) = x {
// Register the callback with the token manager
token_manager.register_callback(x.1, x.0);
@@ 189,6 204,87 @@ impl<X: Executor, I: Event, O: Event> AsyncActor<I, O, X> {
break;
}
}
+ shutdown = shutdown_stream.next() => {
+ if let Some(shutdown) = shutdown {
+ // Drain the queues
+ let inbox = inbox_rx.drain().collect::<Vec<_>>();
+ let consumers_inbox = consumers_rx.drain().collect::<Vec<_>>();
+ let callbacks = callbacks_rx.drain().collect::<Vec<_>>();
+ // Handle connecting the remaining callbacks and consumers first
+ for callback in callbacks {
+ token_manager.register_callback(callback.1, callback.0);
+ }
+ for consumer in consumers_inbox {
+ consumers.push(consumer);
+ }
+ // Then handle any remaining events
+ for event in inbox {
+ let (new_context, output) = logic(context, event).await;
+ context = new_context;
+ if let Some(output) = output {
+ // First process the event through the token manager, so any
+ // callbacks can be called back
+ let output = token_manager.process(output);
+ // Then distribute it to all of our consumers
+ let results = join_all(
+ consumers
+ .iter_mut()
+ .map(|x| async { x.accept(output.stateless_clone()).await})
+ ).await;
+ // Log all of our errors
+ for result in results {
+ if let Err(e) = result {
+ warn!(?e, "Error occurred feeding event into consumer");
+ }
+ }
+ }
+
+ }
+ // Tell the caller we are done
+ shutdown.trigger();
+ break;
+ }
+ }
+ catchup = catchup_stream.next() => {
+ if let Some(catchup) = catchup {
+ // Drain the queues
+ let inbox = inbox_rx.drain().collect::<Vec<_>>();
+ let consumers_inbox = consumers_rx.drain().collect::<Vec<_>>();
+ let callbacks = callbacks_rx.drain().collect::<Vec<_>>();
+ // Handle connecting the remaining callbacks and consumers first
+ for callback in callbacks {
+ token_manager.register_callback(callback.1, callback.0);
+ }
+ for consumer in consumers_inbox {
+ consumers.push(consumer);
+ }
+ // Then handle any remaining events
+ for event in inbox {
+ let (new_context, output) = logic(context, event).await;
+ context = new_context;
+ if let Some(output) = output {
+ // First process the event through the token manager, so any
+ // callbacks can be called back
+ let output = token_manager.process(output);
+ // Then distribute it to all of our consumers
+ let results = join_all(
+ consumers
+ .iter_mut()
+ .map(|x| async { x.accept(output.stateless_clone()).await})
+ ).await;
+ // Log all of our errors
+ for result in results {
+ if let Err(e) = result {
+ warn!(?e, "Error occurred feeding event into consumer");
+ }
+ }
+ }
+
+ }
+ // Tell the caller we are done
+ catchup.trigger();
+ }
+ }
complete => {
break;
}
@@ 202,6 298,8 @@ impl<X: Executor, I: Event, O: Event> AsyncActor<I, O, X> {
inbox: inbox_tx,
consumers: consumers_tx,
callbacks: callbacks_tx,
+ shutdown: shutdown_tx,
+ catchup: catchup_tx,
_executor: PhantomData,
}
}
@@ 287,6 385,17 @@ impl<X: Executor, I: Event, O: Event> Actor<I, O, X> for AsyncActor<I, O, X> {
fn outbox(&self) -> &Self::Outbox {
self
}
+
+ fn shutdown(&self) -> Waiter {
+ let (waiter, trigger) = Waiter::new();
+ let _res = self.shutdown.send(trigger);
+ waiter
+ }
+ fn catchup(&self) -> Waiter {
+ let (waiter, trigger) = Waiter::new();
+ let _res = self.catchup.send(trigger);
+ waiter
+ }
}
#[cfg(test)]
M src/util/async_actor/newtype_macro.rs => src/util/async_actor/newtype_macro.rs +8 -0
@@ 72,6 72,14 @@ macro_rules! async_actor {
fn outbox(&self) -> &Self::Outbox {
self
}
+
+ fn shutdown(&self) -> Waiter {
+ self.0.shutdown()
+ }
+
+ fn catchup(&self) -> Waiter {
+ self.0.catchup()
+ }
}
};
}
M src/util/sync_actor.rs => src/util/sync_actor.rs +114 -4
@@ 7,7 7,7 @@ use std::{
};
use async_trait::async_trait;
-use flume::Sender;
+use flume::{Receiver, Sender};
use futures::future::join_all;
use once_cell::sync::Lazy;
use snafu::Snafu;
@@ 17,7 17,7 @@ use super::TokenManager;
use crate::{
executor::Executor,
traits::{Actor, Event, EventConsumer, EventProducer},
- types::{CompletionToken, DynamicConsumer, DynamicError},
+ types::{CompletionToken, DynamicConsumer, DynamicError, Trigger, Waiter},
};
mod newtype_macro;
@@ 39,6 39,10 @@ pub struct SyncActor<I: Event, O: Event, X: Executor> {
/// A channel to register callbacks over
#[allow(clippy::type_complexity)] // This type isn't actually that complex
callbacks: Sender<(CompletionToken, Box<dyn FnOnce(O) + Send + Sync + 'static>)>,
+ /// A channel for shutdown triggers
+ shutdown: Sender<Trigger>,
+ /// A channel for catch up triggers
+ catchup: Sender<Trigger>,
/// Phantom for the executor type
_executor: PhantomData<X>,
}
@@ 51,6 55,8 @@ impl<X: Executor, I: Event, O: Event> Clone for SyncActor<I, O, X> {
inbox: self.inbox.clone(),
consumers: self.consumers.clone(),
callbacks: self.callbacks.clone(),
+ shutdown: self.shutdown.clone(),
+ catchup: self.catchup.clone(),
_executor: PhantomData,
}
}
@@ 74,7 80,7 @@ impl<X: Executor, I: Event, O: Event> SyncActor<I, O, X> {
/// If `limit` is `Some(_)`, then a bounded queue with the specified limit will be created,
/// otherwise an unbounded queue will be used.
#[instrument(skip(logic, context))]
- pub fn spawn<F, C>(mut logic: F, mut context: C, bound: Option<usize>) -> Self
+ pub fn spawn<F, C>(logic: F, context: C, bound: Option<usize>) -> Self
where
F: FnMut(&mut C, I) -> Option<O> + Send + 'static,
C: Send + Sync + 'static,
@@ 101,6 107,8 @@ impl<X: Executor, I: Event, O: Event> SyncActor<I, O, X> {
Some(x) => flume::bounded(x),
None => flume::unbounded(),
};
+ let (shutdown_tx, shutdown_rx) = flume::bounded(1);
+ let (catchup_tx, catchup_rx) = flume::bounded(1);
// Get a task id
let id = COUNTER.fetch_add(1, Ordering::SeqCst);
info!(?id, "Spawning BasicActor task");
@@ 114,9 122,14 @@ impl<X: Executor, I: Event, O: Event> SyncActor<I, O, X> {
let inbox = inbox_rx;
let consumers_inbox = consumers_rx;
let callbacks = callbacks_rx;
+ let shutdown_channel: Receiver<Trigger> = shutdown_rx;
+ let catchup: Receiver<Trigger> = catchup_rx;
// Store our consumers
let consumers =
RefCell::new(Vec::<Box<dyn EventConsumer<O, Error = DynamicError>>>::new());
+ // Wrap the logic and context in a cell so we can borrow it multiple places
+ let logic = RefCell::new(logic);
+ let context = RefCell::new(context);
// Enter our event handling loop
let shutdown = RefCell::new(false);
loop {
@@ 133,7 146,7 @@ impl<X: Executor, I: Event, O: Event> SyncActor<I, O, X> {
flume::Selector::new()
.recv(&inbox, |i| match i {
Ok(i) => {
- if let Some(output) = logic(&mut context, i) {
+ if let Some(output) = logic.borrow_mut()(&mut context.borrow_mut(), i) {
// First process the event through the token manager, so any
// callbacks can be called back
let output = token_manager.borrow_mut().process(output);
@@ 171,6 184,89 @@ impl<X: Executor, I: Event, O: Event> SyncActor<I, O, X> {
*shutdown.borrow_mut() = true;
}
})
+ .recv(&shutdown_channel, |c| {
+ if let Ok(c) = c {
+ let inbox = inbox.drain().collect::<Vec<_>>();
+ let consumers_inbox = consumers_inbox.drain().collect::<Vec<_>>();
+ let callbacks = callbacks.drain().collect::<Vec<_>>();
+ // Handle connecting the remaining callbacks and consumers first
+ for callback in callbacks {
+ token_manager
+ .borrow_mut()
+ .register_callback(callback.1, callback.0);
+ }
+ for consumer in consumers_inbox {
+ consumers.borrow_mut().push(consumer);
+ }
+ // Then handle any remaining events
+ for event in inbox {
+ if let Some(output) =
+ logic.borrow_mut()(&mut context.borrow_mut(), event)
+ {
+ // First process the event through the token manager, so any
+ // callbacks can be called back
+ let output = token_manager.borrow_mut().process(output);
+ // Then distribute it to all of our consumers, using some cheeky
+ // async here to send out to all the consumers concurrently
+ let results = futures::executor::block_on(join_all(
+ consumers.borrow_mut().iter_mut().map(|x| async {
+ x.accept(output.stateless_clone()).await
+ }),
+ ));
+ // Log all of our errors
+ for result in results {
+ if let Err(e) = result {
+ warn!(?e, "Error occurred feeding event into consumer");
+ }
+ }
+ }
+ }
+ // Now we can shutdown the executor and signal completion
+ *shutdown.borrow_mut() = true;
+ c.trigger();
+ }
+ })
+ .recv(&catchup, |c| {
+ if let Ok(c) = c {
+ let inbox = inbox.drain().collect::<Vec<_>>();
+ let consumers_inbox = consumers_inbox.drain().collect::<Vec<_>>();
+ let callbacks = callbacks.drain().collect::<Vec<_>>();
+ // Handle connecting the remaining callbacks and consumers first
+ for callback in callbacks {
+ token_manager
+ .borrow_mut()
+ .register_callback(callback.1, callback.0);
+ }
+ for consumer in consumers_inbox {
+ consumers.borrow_mut().push(consumer);
+ }
+ // Then handle any remaining events
+ for event in inbox {
+ if let Some(output) =
+ logic.borrow_mut()(&mut context.borrow_mut(), event)
+ {
+ // First process the event through the token manager, so any
+ // callbacks can be called back
+ let output = token_manager.borrow_mut().process(output);
+ // Then distribute it to all of our consumers, using some cheeky
+ // async here to send out to all the consumers concurrently
+ let results = futures::executor::block_on(join_all(
+ consumers.borrow_mut().iter_mut().map(|x| async {
+ x.accept(output.stateless_clone()).await
+ }),
+ ));
+ // Log all of our errors
+ for result in results {
+ if let Err(e) = result {
+ warn!(?e, "Error occurred feeding event into consumer");
+ }
+ }
+ }
+ }
+ // Now we can signal completion
+ c.trigger();
+ }
+ })
.wait();
}
});
@@ 178,6 274,8 @@ impl<X: Executor, I: Event, O: Event> SyncActor<I, O, X> {
inbox: inbox_tx,
consumers: consumers_tx,
callbacks: callbacks_tx,
+ shutdown: shutdown_tx,
+ catchup: catchup_tx,
_executor: PhantomData,
}
}
@@ 259,6 357,18 @@ impl<X: Executor, I: Event, O: Event> Actor<I, O, X> for SyncActor<I, O, X> {
fn outbox(&self) -> &Self::Outbox {
self
}
+
+ fn shutdown(&self) -> Waiter {
+ let (waiter, trigger) = Waiter::new();
+ let _res = self.shutdown.send(trigger);
+ waiter
+ }
+
+ fn catchup(&self) -> Waiter {
+ let (waiter, trigger) = Waiter::new();
+ let _res = self.catchup.send(trigger);
+ waiter
+ }
}
#[cfg(test)]
M src/util/sync_actor/newtype_macro.rs => src/util/sync_actor/newtype_macro.rs +8 -0
@@ 72,6 72,14 @@ macro_rules! sync_actor {
fn outbox(&self) -> &Self::Outbox {
self
}
+
+ fn shutdown(&self) -> Waiter {
+ self.0.shutdown()
+ }
+
+ fn catchup(&self) -> Waiter {
+ self.0.catchup()
+ }
}
};
}