From 69657b11139d9db62ad1a979a94b5287055aa5ae Mon Sep 17 00:00:00 2001 From: Nathan McCarty Date: Wed, 16 Nov 2022 20:44:56 -0500 Subject: [PATCH] refactor!: Use owned context for AsyncActor Make AsyncActor use a logic closure that takes in an owned context and passes it back out, this is more complicated to program, but saves dealing with lifetime hell --- src/util/async_actor.rs | 38 +++++++++++++++++++----------------- src/util/wrapped_channels.rs | 10 +++++----- 2 files changed, 25 insertions(+), 23 deletions(-) diff --git a/src/util/async_actor.rs b/src/util/async_actor.rs index 14b7bb8..1544901 100644 --- a/src/util/async_actor.rs +++ b/src/util/async_actor.rs @@ -71,9 +71,9 @@ impl AsyncActor { /// If `limit` is `Some(_)`, then a bounded queue with the specified limit will be created, /// otherwise an unbounded queue will be used. #[instrument(skip(logic, context))] - pub fn spawn(mut logic: F, context: C, bound: Option) -> Self + pub fn spawn(logic: F, context: C, bound: Option) -> Self where - F: FnMut(&mut C, I) -> Option + Send + Sync + 'static, + F: Fn(C, I) -> (C, Option) + Send + Sync + 'static, C: Send + Sync + 'static, { Self::spawn_async( @@ -91,10 +91,10 @@ impl AsyncActor { /// /// Otherwise behaves identically to [`AsyncActor::spawn`]. #[instrument(skip(logic, context))] - pub fn spawn_async(mut logic: F, mut context: C, bound: Option) -> Self + pub fn spawn_async(logic: F, mut context: C, bound: Option) -> Self where - R: Future> + Send + 'static, - F: FnMut(&mut C, I) -> R + Send + 'static, + R: Future)> + Send, + F: Fn(C, I) -> R + Send + Sync + 'static, C: Send + Sync + 'static, { /// Counter for generating logging ids @@ -149,7 +149,9 @@ impl AsyncActor { // Process the event with the caller provided closure and context, // and then perform bookkeeping if the closure returns an outbound // event - if let Some(output) = logic(&mut context, x).await { + let (new_context, output) = logic(context, x).await; + context = new_context; + if let Some(output) = output { // First process the event through the token manager, so any // callbacks can be called back let output = token_manager.process(output); @@ -297,19 +299,19 @@ mod tests { async fn smoke() { // Create our actor let actor: AsyncActor = AsyncActor::spawn( - |value: &mut i64, mut math: MathEvent| { + |mut value: i64, mut math: MathEvent| { // Pull out the completion token, if there is any let token = math.token(); // Perform the operation - let old_value = *value; + let old_value = value; let math = math.into_inner(); - *value = math.operate(*value); + value = math.operate(value); // Check to see if there was a completion token, if so, send back an Output if let Some(token) = token { // Make our output let output = Output { before: old_value, - after: *value, + after: value, input: math, }; // Wrap it up @@ -317,9 +319,9 @@ mod tests { // Attach the token output.set_completion_token(token); // Send it up - Some(output) + (value, Some(output)) } else { - None + (value, None) } }, 0, @@ -410,19 +412,19 @@ mod tests { fn smoke_threads_sync() { // Create our actor let actor: AsyncActor = AsyncActor::spawn( - |value: &mut i64, mut math: MathEvent| { + |mut value: i64, mut math: MathEvent| { // Pull out the completion token, if there is any let token = math.token(); // Perform the operation - let old_value = *value; + let old_value = value; let math = math.into_inner(); - *value = math.operate(*value); + value = math.operate(value); // Check to see if there was a completion token, if so, send back an Output if let Some(token) = token { // Make our output let output = Output { before: old_value, - after: *value, + after: value, input: math, }; // Wrap it up @@ -430,9 +432,9 @@ mod tests { // Attach the token output.set_completion_token(token); // Send it up - Some(output) + (value, Some(output)) } else { - None + (value, None) } }, 0, diff --git a/src/util/wrapped_channels.rs b/src/util/wrapped_channels.rs index a053955..abb22d4 100644 --- a/src/util/wrapped_channels.rs +++ b/src/util/wrapped_channels.rs @@ -154,19 +154,19 @@ mod tests { let (tx, rx) = AsyncWrappedReceiver::>::new(Some(1)); // Spawn up an actor let actor: AsyncActor, WrappedEvent, X> = AsyncActor::spawn( - |value: &mut usize, mut add: WrappedEvent| { + |mut value: usize, mut add: WrappedEvent| { // Grab the token, if there is one let token = add.token(); let add = add.into_inner(); // Do our addition - *value += add; + value += add; // See if we need to return if let Some(token) = token { - let mut event: WrappedEvent = Output { val: *value }.into(); + let mut event: WrappedEvent = Output { val: value }.into(); event.set_completion_token(token); - Some(event) + (value, Some(event)) } else { - None + (value, None) } }, 0, -- 2.45.2