~thatonelutenist/actm

1e4da6cf3200242ddc473d05c28e5935f6ece085 — Nathan McCarty 1 year, 10 months ago 69657b1
feat: Add new type macros for Actors

Add the async_actor and sync_actor macros to automatically create
type-saftey wrappers around AsyncActor and SyncActor, respectively,
given the name of the desired type, the input and output event types,
the context type, and the event handling method.
M src/testing_util.rs => src/testing_util.rs +188 -1
@@ 1,7 1,18 @@
#![allow(dead_code)]
use async_trait::async_trait;
use enum_dispatch::enum_dispatch;
use proptest_derive::Arbitrary;

use crate::util::WrappedEvent;
use crate::{
    async_actor,
    executor::Executor,
    sync_actor,
    traits::{Actor, Event, EventConsumer, EventProducer},
    types::CompletionToken,
    util::{AsyncActor, AsyncActorError, SyncActor, SyncActorError, WrappedEvent},
};

// Define the Event types, here the context type is an i64

/// Dummy Event implementation for use in unit testing
#[enum_dispatch]


@@ 42,3 53,179 @@ pub(crate) struct Output {

pub(crate) type MathEvent = WrappedEvent<MathEventType>;
pub(crate) type OutputEvent = WrappedEvent<Output>;

// Async actor definition

#[allow(clippy::unused_async)]
async fn async_math_event_handler(
    mut value: i64,
    mut math: MathEvent,
) -> (i64, Option<OutputEvent>) {
    // Pull out the completion token, if there is any
    let token = math.token();
    // Perform the operation
    let old_value = value;
    let math = math.into_inner();
    value = math.operate(value);
    // Check to see if there was a completion token, if so, send back an Output
    if let Some(token) = token {
        // Make our output
        let output = Output {
            before: old_value,
            after: value,
            input: math,
        };
        // Wrap it up
        let mut output = OutputEvent::from(output);
        // Attach the token
        output.set_completion_token(token);
        // Send it up
        (value, Some(output))
    } else {
        (value, None)
    }
}

async_actor!(
    AsyncMathActor,
    MathEvent,
    OutputEvent,
    i64,
    async_math_event_handler
);

// Sync Actor Definition

fn math_event_handler(value: &mut i64, mut math: MathEvent) -> Option<OutputEvent> {
    // Pull out the completion token, if there is any
    let token = math.token();
    // Perform the operation
    let old_value = *value;
    let math = math.into_inner();
    *value = math.operate(*value);
    // Check to see if there was a completion token, if so, send back an Output
    if let Some(token) = token {
        // Make our output
        let output = Output {
            before: old_value,
            after: *value,
            input: math,
        };
        // Wrap it up
        let mut output = OutputEvent::from(output);
        // Attach the token
        output.set_completion_token(token);
        // Send it up
        Some(output)
    } else {
        None
    }
}

sync_actor!(
    SyncMathActor,
    MathEvent,
    OutputEvent,
    i64,
    math_event_handler
);

mod tests {
    use super::*;
    use crate::traits::ActorExt;

    mod async_math_actor {

        use futures::StreamExt;

        use super::*;
        use crate::executor::AsyncStd;
        #[async_std::test]
        async fn smoke() {
            let actor = AsyncMathActor::<AsyncStd>::new(0, Some(1));
            let output = actor.stream(None).await.unwrap();
            // Add some numbers to our internal count
            for i in 1..=10 {
                let mut event: MathEvent = MathEventType::Add(Add(i)).into();
                if i % 2 == 0 {
                    // Tokenize the even events for testing the event sending behavior
                    let _token = event.tokenize();
                }
                actor.inbox().accept(event).await.unwrap();
            }
            // Make sure we have the correct count
            let count_event = actor
                .call(MathEventType::Add(Add(0)).into())
                .await
                .unwrap()
                .unwrap()
                .into_inner();
            assert_eq!(count_event.after, 55);
            println!("Past first assert");
            // Make sure our events are as expected
            let output_events = output
                .stream()
                .take(6)
                .map(|x| x.into_inner().input)
                .collect::<Vec<_>>()
                .await;
            println!("Events in output: {:?}", output_events);
            assert_eq!(
                [2, 4, 6, 8, 10, 0]
                    .into_iter()
                    .map(|x| MathEventType::Add(Add(x)))
                    .collect::<Vec<_>>(),
                output_events
            );
        }
    }

    mod sync_math_actor {
        use super::*;
        use crate::executor::Threads;

        #[test]
        fn smoke() {
            let actor = SyncMathActor::<Threads>::new(0, Some(1));
            let output = actor.stream_sync(None).unwrap();
            // Sleep for a bit to let the subscription take
            std::thread::sleep(std::time::Duration::from_millis(50));
            println!("Started up actor");
            // Add some numbers to our internal count
            for i in 1..=10 {
                println!("Sending event {i}");
                let mut event: MathEvent = MathEventType::Add(Add(i)).into();
                if i % 2 == 0 {
                    // Tokenize the even events for testing the event sending behavior
                    let _token = event.tokenize();
                }
                actor.inbox().accept_sync(event).unwrap();
                println!("Sent event {i}");
            }
            // Make sure we have the correct count
            println!("Getting counter");
            let count_event = actor
                .call_sync(MathEventType::Add(Add(0)).into())
                .unwrap()
                .unwrap()
                .into_inner();
            println!("Got Counter");
            assert_eq!(count_event.after, 55);
            println!("Past first assert");
            // Make sure our events are as expected
            let output_events = output
                .iter()
                .take(6)
                .map(|x| x.into_inner().input)
                .collect::<Vec<_>>();
            println!("Events in output: {:?}", output_events);
            assert_eq!(
                [2, 4, 6, 8, 10, 0]
                    .into_iter()
                    .map(|x| MathEventType::Add(Add(x)))
                    .collect::<Vec<_>>(),
                output_events
            );
        }
    }
}

M src/util/async_actor.rs => src/util/async_actor.rs +2 -0
@@ 20,6 20,8 @@ use crate::{
    types::{CompletionToken, DynamicConsumer, DynamicError},
};

mod newtype_macro;

/// Error connecting to [`AsyncActor`]
#[derive(Debug, Snafu)]
#[non_exhaustive]

A src/util/async_actor/newtype_macro.rs => src/util/async_actor/newtype_macro.rs +77 -0
@@ 0,0 1,77 @@
//! Macro for creating new actor implementations using an async actor under the hood

/// Auto generates an async actor
#[macro_export]
macro_rules! async_actor {
    ($type_name:ident, $input_type:ty, $output_type:ty, $context_type:ty, $event_method:ident) => {
        #[derive(Clone)]
        pub struct $type_name<X: Executor>(AsyncActor<$input_type, $output_type, X>);
        impl<X: Executor> $type_name<X> {
            pub fn new(initial_context: $context_type, bound: Option<usize>) -> Self {
                let actor = AsyncActor::spawn_async($event_method, initial_context, bound);
                Self(actor)
            }
        }
        #[async_trait]
        impl<X: Executor> EventConsumer<$input_type> for $type_name<X> {
            type Error = AsyncActorError;
            async fn accept(&self, event: $input_type) -> Result<(), Self::Error> {
                self.0.accept(event).await
            }
            fn accept_sync(&self, event: $input_type) -> Result<(), Self::Error> {
                self.0.accept_sync(event)
            }
        }
        #[async_trait]
        impl<X: Executor> EventProducer<$output_type> for $type_name<X> {
            type Error = AsyncActorError;
            async fn register_consumer<C>(&self, consumer: C) -> Result<(), Self::Error>
            where
                C: EventConsumer<$output_type> + Send + Sync + 'static,
            {
                self.0.register_consumer(consumer).await
            }

            async fn register_callback<F>(
                &self,
                callback: F,
                token: CompletionToken,
            ) -> Result<(), Self::Error>
            where
                F: FnOnce($output_type) + Send + Sync + 'static,
            {
                self.0.register_callback(callback, token).await
            }

            fn register_consumer_sync<C>(&self, consumer: C) -> Result<(), Self::Error>
            where
                C: EventConsumer<$output_type> + Send + Sync + 'static,
            {
                self.0.register_consumer_sync(consumer)
            }

            fn register_callback_sync<F>(
                &self,
                callback: F,
                token: CompletionToken,
            ) -> Result<(), Self::Error>
            where
                F: FnOnce($output_type) + Send + Sync + 'static,
            {
                self.0.register_callback_sync(callback, token)
            }
        }
        impl<X: Executor> Actor<$input_type, $output_type, X> for $type_name<X> {
            type Inbox = Self;
            type Outbox = Self;

            fn inbox(&self) -> &Self::Inbox {
                self
            }

            fn outbox(&self) -> &Self::Outbox {
                self
            }
        }
    };
}

M src/util/sync_actor.rs => src/util/sync_actor.rs +2 -0
@@ 20,6 20,8 @@ use crate::{
    types::{CompletionToken, DynamicConsumer, DynamicError},
};

mod newtype_macro;

/// Error connecting to [`SyncActor`]
#[derive(Debug, Snafu)]
#[non_exhaustive]

A src/util/sync_actor/newtype_macro.rs => src/util/sync_actor/newtype_macro.rs +77 -0
@@ 0,0 1,77 @@
//! Macro for creating new actor implementations using an sync actor under the hood

/// Auto generates an sync actor
#[macro_export]
macro_rules! sync_actor {
    ($type_name:ident, $input_type:ty, $output_type:ty, $context_type:ty, $event_method:ident) => {
        #[derive(Clone)]
        pub struct $type_name<X: Executor>(SyncActor<$input_type, $output_type, X>);
        impl<X: Executor> $type_name<X> {
            pub fn new(initial_context: $context_type, bound: Option<usize>) -> Self {
                let actor = SyncActor::spawn($event_method, initial_context, bound);
                Self(actor)
            }
        }
        #[async_trait]
        impl<X: Executor> EventConsumer<$input_type> for $type_name<X> {
            type Error = SyncActorError;
            async fn accept(&self, event: $input_type) -> Result<(), Self::Error> {
                self.0.accept(event).await
            }
            fn accept_sync(&self, event: $input_type) -> Result<(), Self::Error> {
                self.0.accept_sync(event)
            }
        }
        #[async_trait]
        impl<X: Executor> EventProducer<$output_type> for $type_name<X> {
            type Error = SyncActorError;
            async fn register_consumer<C>(&self, consumer: C) -> Result<(), Self::Error>
            where
                C: EventConsumer<$output_type> + Send + Sync + 'static,
            {
                self.0.register_consumer(consumer).await
            }

            async fn register_callback<F>(
                &self,
                callback: F,
                token: CompletionToken,
            ) -> Result<(), Self::Error>
            where
                F: FnOnce($output_type) + Send + Sync + 'static,
            {
                self.0.register_callback(callback, token).await
            }

            fn register_consumer_sync<C>(&self, consumer: C) -> Result<(), Self::Error>
            where
                C: EventConsumer<$output_type> + Send + Sync + 'static,
            {
                self.0.register_consumer_sync(consumer)
            }

            fn register_callback_sync<F>(
                &self,
                callback: F,
                token: CompletionToken,
            ) -> Result<(), Self::Error>
            where
                F: FnOnce($output_type) + Send + Sync + 'static,
            {
                self.0.register_callback_sync(callback, token)
            }
        }
        impl<X: Executor> Actor<$input_type, $output_type, X> for $type_name<X> {
            type Inbox = Self;
            type Outbox = Self;

            fn inbox(&self) -> &Self::Inbox {
                self
            }

            fn outbox(&self) -> &Self::Outbox {
                self
            }
        }
    };
}