From 1e4da6cf3200242ddc473d05c28e5935f6ece085 Mon Sep 17 00:00:00 2001 From: Nathan McCarty Date: Wed, 16 Nov 2022 02:34:10 -0500 Subject: [PATCH] feat: Add new type macros for Actors Add the async_actor and sync_actor macros to automatically create type-saftey wrappers around AsyncActor and SyncActor, respectively, given the name of the desired type, the input and output event types, the context type, and the event handling method. --- src/testing_util.rs | 189 +++++++++++++++++++++++++- src/util/async_actor.rs | 2 + src/util/async_actor/newtype_macro.rs | 77 +++++++++++ src/util/sync_actor.rs | 2 + src/util/sync_actor/newtype_macro.rs | 77 +++++++++++ 5 files changed, 346 insertions(+), 1 deletion(-) create mode 100644 src/util/async_actor/newtype_macro.rs create mode 100644 src/util/sync_actor/newtype_macro.rs diff --git a/src/testing_util.rs b/src/testing_util.rs index 395752f..817ff03 100644 --- a/src/testing_util.rs +++ b/src/testing_util.rs @@ -1,7 +1,18 @@ +#![allow(dead_code)] +use async_trait::async_trait; use enum_dispatch::enum_dispatch; use proptest_derive::Arbitrary; -use crate::util::WrappedEvent; +use crate::{ + async_actor, + executor::Executor, + sync_actor, + traits::{Actor, Event, EventConsumer, EventProducer}, + types::CompletionToken, + util::{AsyncActor, AsyncActorError, SyncActor, SyncActorError, WrappedEvent}, +}; + +// Define the Event types, here the context type is an i64 /// Dummy Event implementation for use in unit testing #[enum_dispatch] @@ -42,3 +53,179 @@ pub(crate) struct Output { pub(crate) type MathEvent = WrappedEvent; pub(crate) type OutputEvent = WrappedEvent; + +// Async actor definition + +#[allow(clippy::unused_async)] +async fn async_math_event_handler( + mut value: i64, + mut math: MathEvent, +) -> (i64, Option) { + // Pull out the completion token, if there is any + let token = math.token(); + // Perform the operation + let old_value = value; + let math = math.into_inner(); + value = math.operate(value); + // Check to see if there was a completion token, if so, send back an Output + if let Some(token) = token { + // Make our output + let output = Output { + before: old_value, + after: value, + input: math, + }; + // Wrap it up + let mut output = OutputEvent::from(output); + // Attach the token + output.set_completion_token(token); + // Send it up + (value, Some(output)) + } else { + (value, None) + } +} + +async_actor!( + AsyncMathActor, + MathEvent, + OutputEvent, + i64, + async_math_event_handler +); + +// Sync Actor Definition + +fn math_event_handler(value: &mut i64, mut math: MathEvent) -> Option { + // Pull out the completion token, if there is any + let token = math.token(); + // Perform the operation + let old_value = *value; + let math = math.into_inner(); + *value = math.operate(*value); + // Check to see if there was a completion token, if so, send back an Output + if let Some(token) = token { + // Make our output + let output = Output { + before: old_value, + after: *value, + input: math, + }; + // Wrap it up + let mut output = OutputEvent::from(output); + // Attach the token + output.set_completion_token(token); + // Send it up + Some(output) + } else { + None + } +} + +sync_actor!( + SyncMathActor, + MathEvent, + OutputEvent, + i64, + math_event_handler +); + +mod tests { + use super::*; + use crate::traits::ActorExt; + + mod async_math_actor { + + use futures::StreamExt; + + use super::*; + use crate::executor::AsyncStd; + #[async_std::test] + async fn smoke() { + let actor = AsyncMathActor::::new(0, Some(1)); + let output = actor.stream(None).await.unwrap(); + // Add some numbers to our internal count + for i in 1..=10 { + let mut event: MathEvent = MathEventType::Add(Add(i)).into(); + if i % 2 == 0 { + // Tokenize the even events for testing the event sending behavior + let _token = event.tokenize(); + } + actor.inbox().accept(event).await.unwrap(); + } + // Make sure we have the correct count + let count_event = actor + .call(MathEventType::Add(Add(0)).into()) + .await + .unwrap() + .unwrap() + .into_inner(); + assert_eq!(count_event.after, 55); + println!("Past first assert"); + // Make sure our events are as expected + let output_events = output + .stream() + .take(6) + .map(|x| x.into_inner().input) + .collect::>() + .await; + println!("Events in output: {:?}", output_events); + assert_eq!( + [2, 4, 6, 8, 10, 0] + .into_iter() + .map(|x| MathEventType::Add(Add(x))) + .collect::>(), + output_events + ); + } + } + + mod sync_math_actor { + use super::*; + use crate::executor::Threads; + + #[test] + fn smoke() { + let actor = SyncMathActor::::new(0, Some(1)); + let output = actor.stream_sync(None).unwrap(); + // Sleep for a bit to let the subscription take + std::thread::sleep(std::time::Duration::from_millis(50)); + println!("Started up actor"); + // Add some numbers to our internal count + for i in 1..=10 { + println!("Sending event {i}"); + let mut event: MathEvent = MathEventType::Add(Add(i)).into(); + if i % 2 == 0 { + // Tokenize the even events for testing the event sending behavior + let _token = event.tokenize(); + } + actor.inbox().accept_sync(event).unwrap(); + println!("Sent event {i}"); + } + // Make sure we have the correct count + println!("Getting counter"); + let count_event = actor + .call_sync(MathEventType::Add(Add(0)).into()) + .unwrap() + .unwrap() + .into_inner(); + println!("Got Counter"); + assert_eq!(count_event.after, 55); + println!("Past first assert"); + // Make sure our events are as expected + let output_events = output + .iter() + .take(6) + .map(|x| x.into_inner().input) + .collect::>(); + println!("Events in output: {:?}", output_events); + assert_eq!( + [2, 4, 6, 8, 10, 0] + .into_iter() + .map(|x| MathEventType::Add(Add(x))) + .collect::>(), + output_events + ); + } + } +} diff --git a/src/util/async_actor.rs b/src/util/async_actor.rs index 1544901..c6eaf03 100644 --- a/src/util/async_actor.rs +++ b/src/util/async_actor.rs @@ -20,6 +20,8 @@ use crate::{ types::{CompletionToken, DynamicConsumer, DynamicError}, }; +mod newtype_macro; + /// Error connecting to [`AsyncActor`] #[derive(Debug, Snafu)] #[non_exhaustive] diff --git a/src/util/async_actor/newtype_macro.rs b/src/util/async_actor/newtype_macro.rs new file mode 100644 index 0000000..8201380 --- /dev/null +++ b/src/util/async_actor/newtype_macro.rs @@ -0,0 +1,77 @@ +//! Macro for creating new actor implementations using an async actor under the hood + +/// Auto generates an async actor +#[macro_export] +macro_rules! async_actor { + ($type_name:ident, $input_type:ty, $output_type:ty, $context_type:ty, $event_method:ident) => { + #[derive(Clone)] + pub struct $type_name(AsyncActor<$input_type, $output_type, X>); + impl $type_name { + pub fn new(initial_context: $context_type, bound: Option) -> Self { + let actor = AsyncActor::spawn_async($event_method, initial_context, bound); + Self(actor) + } + } + #[async_trait] + impl EventConsumer<$input_type> for $type_name { + type Error = AsyncActorError; + async fn accept(&self, event: $input_type) -> Result<(), Self::Error> { + self.0.accept(event).await + } + fn accept_sync(&self, event: $input_type) -> Result<(), Self::Error> { + self.0.accept_sync(event) + } + } + #[async_trait] + impl EventProducer<$output_type> for $type_name { + type Error = AsyncActorError; + async fn register_consumer(&self, consumer: C) -> Result<(), Self::Error> + where + C: EventConsumer<$output_type> + Send + Sync + 'static, + { + self.0.register_consumer(consumer).await + } + + async fn register_callback( + &self, + callback: F, + token: CompletionToken, + ) -> Result<(), Self::Error> + where + F: FnOnce($output_type) + Send + Sync + 'static, + { + self.0.register_callback(callback, token).await + } + + fn register_consumer_sync(&self, consumer: C) -> Result<(), Self::Error> + where + C: EventConsumer<$output_type> + Send + Sync + 'static, + { + self.0.register_consumer_sync(consumer) + } + + fn register_callback_sync( + &self, + callback: F, + token: CompletionToken, + ) -> Result<(), Self::Error> + where + F: FnOnce($output_type) + Send + Sync + 'static, + { + self.0.register_callback_sync(callback, token) + } + } + impl Actor<$input_type, $output_type, X> for $type_name { + type Inbox = Self; + type Outbox = Self; + + fn inbox(&self) -> &Self::Inbox { + self + } + + fn outbox(&self) -> &Self::Outbox { + self + } + } + }; +} diff --git a/src/util/sync_actor.rs b/src/util/sync_actor.rs index 234b14f..90d1260 100644 --- a/src/util/sync_actor.rs +++ b/src/util/sync_actor.rs @@ -20,6 +20,8 @@ use crate::{ types::{CompletionToken, DynamicConsumer, DynamicError}, }; +mod newtype_macro; + /// Error connecting to [`SyncActor`] #[derive(Debug, Snafu)] #[non_exhaustive] diff --git a/src/util/sync_actor/newtype_macro.rs b/src/util/sync_actor/newtype_macro.rs new file mode 100644 index 0000000..1967c49 --- /dev/null +++ b/src/util/sync_actor/newtype_macro.rs @@ -0,0 +1,77 @@ +//! Macro for creating new actor implementations using an sync actor under the hood + +/// Auto generates an sync actor +#[macro_export] +macro_rules! sync_actor { + ($type_name:ident, $input_type:ty, $output_type:ty, $context_type:ty, $event_method:ident) => { + #[derive(Clone)] + pub struct $type_name(SyncActor<$input_type, $output_type, X>); + impl $type_name { + pub fn new(initial_context: $context_type, bound: Option) -> Self { + let actor = SyncActor::spawn($event_method, initial_context, bound); + Self(actor) + } + } + #[async_trait] + impl EventConsumer<$input_type> for $type_name { + type Error = SyncActorError; + async fn accept(&self, event: $input_type) -> Result<(), Self::Error> { + self.0.accept(event).await + } + fn accept_sync(&self, event: $input_type) -> Result<(), Self::Error> { + self.0.accept_sync(event) + } + } + #[async_trait] + impl EventProducer<$output_type> for $type_name { + type Error = SyncActorError; + async fn register_consumer(&self, consumer: C) -> Result<(), Self::Error> + where + C: EventConsumer<$output_type> + Send + Sync + 'static, + { + self.0.register_consumer(consumer).await + } + + async fn register_callback( + &self, + callback: F, + token: CompletionToken, + ) -> Result<(), Self::Error> + where + F: FnOnce($output_type) + Send + Sync + 'static, + { + self.0.register_callback(callback, token).await + } + + fn register_consumer_sync(&self, consumer: C) -> Result<(), Self::Error> + where + C: EventConsumer<$output_type> + Send + Sync + 'static, + { + self.0.register_consumer_sync(consumer) + } + + fn register_callback_sync( + &self, + callback: F, + token: CompletionToken, + ) -> Result<(), Self::Error> + where + F: FnOnce($output_type) + Send + Sync + 'static, + { + self.0.register_callback_sync(callback, token) + } + } + impl Actor<$input_type, $output_type, X> for $type_name { + type Inbox = Self; + type Outbox = Self; + + fn inbox(&self) -> &Self::Inbox { + self + } + + fn outbox(&self) -> &Self::Outbox { + self + } + } + }; +} -- 2.45.2