~thatonelutenist/actm

69657b11139d9db62ad1a979a94b5287055aa5ae — Nathan McCarty 1 year, 9 months ago 0309947
refactor!: Use owned context for AsyncActor

Make AsyncActor use a logic closure that takes in an owned context and
passes it back out, this is more complicated to program, but saves
dealing with lifetime hell
2 files changed, 25 insertions(+), 23 deletions(-)

M src/util/async_actor.rs
M src/util/wrapped_channels.rs
M src/util/async_actor.rs => src/util/async_actor.rs +20 -18
@@ 71,9 71,9 @@ impl<X: Executor, I: Event, O: Event> AsyncActor<I, O, X> {
    /// If `limit` is `Some(_)`, then a bounded queue with the specified limit will be created,
    /// otherwise an unbounded queue will be used.
    #[instrument(skip(logic, context))]
    pub fn spawn<F, C>(mut logic: F, context: C, bound: Option<usize>) -> Self
    pub fn spawn<F, C>(logic: F, context: C, bound: Option<usize>) -> Self
    where
        F: FnMut(&mut C, I) -> Option<O> + Send + Sync + 'static,
        F: Fn(C, I) -> (C, Option<O>) + Send + Sync + 'static,
        C: Send + Sync + 'static,
    {
        Self::spawn_async(


@@ 91,10 91,10 @@ impl<X: Executor, I: Event, O: Event> AsyncActor<I, O, X> {
    ///
    /// Otherwise behaves identically to [`AsyncActor::spawn`].
    #[instrument(skip(logic, context))]
    pub fn spawn_async<R, F, C>(mut logic: F, mut context: C, bound: Option<usize>) -> Self
    pub fn spawn_async<R, F, C>(logic: F, mut context: C, bound: Option<usize>) -> Self
    where
        R: Future<Output = Option<O>> + Send + 'static,
        F: FnMut(&mut C, I) -> R + Send + 'static,
        R: Future<Output = (C, Option<O>)> + Send,
        F: Fn(C, I) -> R + Send + Sync + 'static,
        C: Send + Sync + 'static,
    {
        /// Counter for generating logging ids


@@ 149,7 149,9 @@ impl<X: Executor, I: Event, O: Event> AsyncActor<I, O, X> {
                                // Process the event with the caller provided closure and context,
                                // and then perform bookkeeping if the closure returns an outbound
                                // event
                                if let Some(output) = logic(&mut context, x).await {
                                let (new_context, output) = logic(context, x).await;
                                context = new_context;
                                if let Some(output) = output {
                                    // First process the event through the token manager, so any
                                    // callbacks can be called back
                                    let output = token_manager.process(output);


@@ 297,19 299,19 @@ mod tests {
    async fn smoke<X: Executor>() {
        // Create our actor
        let actor: AsyncActor<MathEvent, OutputEvent, X> = AsyncActor::spawn(
            |value: &mut i64, mut math: MathEvent| {
            |mut value: i64, mut math: MathEvent| {
                // Pull out the completion token, if there is any
                let token = math.token();
                // Perform the operation
                let old_value = *value;
                let old_value = value;
                let math = math.into_inner();
                *value = math.operate(*value);
                value = math.operate(value);
                // Check to see if there was a completion token, if so, send back an Output
                if let Some(token) = token {
                    // Make our output
                    let output = Output {
                        before: old_value,
                        after: *value,
                        after: value,
                        input: math,
                    };
                    // Wrap it up


@@ 317,9 319,9 @@ mod tests {
                    // Attach the token
                    output.set_completion_token(token);
                    // Send it up
                    Some(output)
                    (value, Some(output))
                } else {
                    None
                    (value, None)
                }
            },
            0,


@@ 410,19 412,19 @@ mod tests {
    fn smoke_threads_sync() {
        // Create our actor
        let actor: AsyncActor<MathEvent, OutputEvent, Threads> = AsyncActor::spawn(
            |value: &mut i64, mut math: MathEvent| {
            |mut value: i64, mut math: MathEvent| {
                // Pull out the completion token, if there is any
                let token = math.token();
                // Perform the operation
                let old_value = *value;
                let old_value = value;
                let math = math.into_inner();
                *value = math.operate(*value);
                value = math.operate(value);
                // Check to see if there was a completion token, if so, send back an Output
                if let Some(token) = token {
                    // Make our output
                    let output = Output {
                        before: old_value,
                        after: *value,
                        after: value,
                        input: math,
                    };
                    // Wrap it up


@@ 430,9 432,9 @@ mod tests {
                    // Attach the token
                    output.set_completion_token(token);
                    // Send it up
                    Some(output)
                    (value, Some(output))
                } else {
                    None
                    (value, None)
                }
            },
            0,

M src/util/wrapped_channels.rs => src/util/wrapped_channels.rs +5 -5
@@ 154,19 154,19 @@ mod tests {
        let (tx, rx) = AsyncWrappedReceiver::<X, WrappedEvent<usize>>::new(Some(1));
        // Spawn up an actor
        let actor: AsyncActor<WrappedEvent<usize>, WrappedEvent<Output>, X> = AsyncActor::spawn(
            |value: &mut usize, mut add: WrappedEvent<usize>| {
            |mut value: usize, mut add: WrappedEvent<usize>| {
                // Grab the token, if there is one
                let token = add.token();
                let add = add.into_inner();
                // Do our addition
                *value += add;
                value += add;
                // See if we need to return
                if let Some(token) = token {
                    let mut event: WrappedEvent<Output> = Output { val: *value }.into();
                    let mut event: WrappedEvent<Output> = Output { val: value }.into();
                    event.set_completion_token(token);
                    Some(event)
                    (value, Some(event))
                } else {
                    None
                    (value, None)
                }
            },
            0,