~thatonelutenist/actm

fe0e5d93b40c01671d9bccd832d5808ee801238f — Nathan McCarty 1 year, 9 months ago 0bc2cf6
feat: Adaptor utility type

Add a utility type that wraps an `EventConsumer`, allowing it to consume
another type via a conversion closure.
M examples/grading-tests/main.rs => examples/grading-tests/main.rs +35 -6
@@ 8,9 8,12 @@ use nanorand::Rng;
use professor::ProfessorOutputType;

use crate::{
    professor::{AddStudent, Professor, ProfessorContext, ProfessorInputType},
    student::{Student, StudentContext, StudentInput},
    university::{University, UniversityContext},
    professor::{
        AddStudent, Professor, ProfessorContext, ProfessorInputType, ProfessorOutputEvent,
        SubmitTest,
    },
    student::{Student, StudentContext, StudentInput, StudentOutputEvent},
    university::{University, UniversityContext, UniversityInput},
};

pub mod professor;


@@ 44,7 47,18 @@ async fn main() {
    // Create a university and force it to listen to our professor
    let university = University::<AsyncStd>::new(UniversityContext::default(), None);
    professor
        .register_consumer(university.clone())
        .register_consumer(Adaptor::wrap(
            |e: ProfessorOutputEvent| {
                let e = e.into_inner();
                match e {
                    ProfessorOutputType::Grade(name, grade) => {
                        Some(UniversityInput::Grade(name, grade).into())
                    }
                    _ => None,
                }
            },
            university.clone(),
        ))
        .await
        .unwrap();
    professor.catchup().wait().await;


@@ 63,7 77,22 @@ async fn main() {
            );
            // Connect it to the professor
            println!("Hooking {name} up to professor");
            student.register_consumer(professor.clone()).await.unwrap();
            student
                .register_consumer(Adaptor::wrap(
                    |e: StudentOutputEvent| {
                        let e = e.into_inner();
                        Some(
                            ProfessorInputType::SubmitTest(SubmitTest {
                                answers: e.answers,
                                student: e.name,
                            })
                            .into(),
                        )
                    },
                    professor.clone(),
                ))
                .await
                .unwrap();
            // Tell the professor about it
            println!("Telling professor about {name}");
            professor


@@ 113,7 142,7 @@ async fn main() {
    university.catchup().wait().await;
    // Collect the results from our students
    let grades = university
        .call(ProfessorOutputType::Get.into())
        .call(UniversityInput::Get.into())
        .await
        .unwrap()
        .unwrap()

M examples/grading-tests/professor.rs => examples/grading-tests/professor.rs +0 -1
@@ 60,7 60,6 @@ pub enum ProfessorInputType {
pub enum ProfessorOutputType {
    Grade(String, u32),
    TestGraded,
    Get,
}

// Now generate the impls

M examples/grading-tests/student.rs => examples/grading-tests/student.rs +17 -9
@@ 1,10 1,7 @@
use actm::prelude::*;
use nanorand::Rng;

use crate::{
    professor::{ProfessorInputEvent, ProfessorInputType, SubmitTest},
    Answers, Test,
};
use crate::{Answers, Test};

// First we define the context type



@@ 21,14 18,24 @@ pub struct StudentInput {
    pub test: Test,
}

// Then the output event
#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Debug)]
pub struct StudentOutput {
    /// The answers to the test that the student just completed
    pub answers: Answers,
    /// The student's name
    pub name: String,
}

// Generate the impls

wrapped_event!(StudentInputEvent, StudentInput);
wrapped_event!(StudentOutputEvent, StudentOutput);

async fn student_event_handler(
    context: StudentContext,
    mut event: StudentInputEvent,
) -> (StudentContext, Option<ProfessorInputEvent>) {
) -> (StudentContext, Option<StudentOutputEvent>) {
    let token = event.token();
    let mut rng = nanorand::tls_rng();
    let results = event


@@ 39,10 46,11 @@ async fn student_event_handler(
        .map(|x| -> bool { x > rng.generate_range(1_u32..=100) })
        .collect::<Vec<_>>();
    let name = context.name.clone();
    let mut new_event = ProfessorInputEvent::from(ProfessorInputType::SubmitTest(SubmitTest {
    let mut new_event: StudentOutputEvent = StudentOutput {
        answers: Answers { answers: results },
        student: name,
    }));
        name,
    }
    .into();
    if let Some(token) = token {
        new_event.set_completion_token(token);
    }


@@ 52,7 60,7 @@ async fn student_event_handler(
async_actor!(
    Student,
    StudentInputEvent,
    ProfessorInputEvent,
    StudentOutputEvent,
    StudentContext,
    student_event_handler
);

M examples/grading-tests/university.rs => examples/grading-tests/university.rs +14 -6
@@ 2,7 2,7 @@ use std::collections::BTreeMap;

use actm::prelude::*;

use crate::professor::{ProfessorOutputEvent, ProfessorOutputType};


// First we define the context type



@@ 12,26 12,34 @@ pub struct UniversityContext {
    pub students: BTreeMap<String, Vec<u32>>,
}

// Then the input type

#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Debug)]
pub enum UniversityInput {
    Get,
    Grade(String, u32),
}

// Then the output type

#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Debug)]
pub struct UniversityOutput(pub BTreeMap<String, Vec<u32>>);

wrapped_event!(UniversityOutputEvent, UniversityOutput);
wrapped_event!(UniveristyInputEvent, UniversityInput);

async fn university_event_handler(
    mut context: UniversityContext,
    mut event: ProfessorOutputEvent,
    mut event: UniveristyInputEvent,
) -> (UniversityContext, Option<UniversityOutputEvent>) {
    let token = event.token();
    match event.into_inner() {
        ProfessorOutputType::Grade(student, grade) => {
        UniversityInput::Grade(student, grade) => {
            let grades = context.students.entry(student).or_default();
            grades.push(grade);
            (context, None)
        }
        ProfessorOutputType::TestGraded => (context, None),
        ProfessorOutputType::Get => {
        UniversityInput::Get => {
            let mut event = UniversityOutputEvent::from(UniversityOutput(context.students.clone()));
            if let Some(token) = token {
                event.set_completion_token(token);


@@ 43,7 51,7 @@ async fn university_event_handler(

async_actor!(
    University,
    ProfessorOutputEvent,
    UniveristyInputEvent,
    UniversityOutputEvent,
    UniversityContext,
    university_event_handler

M src/lib.rs => src/lib.rs +1 -1
@@ 38,7 38,7 @@ pub mod prelude {
        sync_actor,
        traits::{Actor, Event, EventConsumer, EventProducer},
        types::{CompletionToken, Trigger, Waiter},
        util::{AsyncActor, AsyncActorError, SyncActor, SyncActorError, WrappedEvent},
        util::{Adaptor, AsyncActor, AsyncActorError, SyncActor, SyncActorError, WrappedEvent},
        wrapped_event,
    };
}

M src/util.rs => src/util.rs +2 -0
@@ 2,6 2,7 @@

mod async_actor;
mod collector;
mod consumer_adaptor;
pub(crate) mod either_error;
mod sync_actor;
mod token_manager;


@@ 10,6 11,7 @@ mod wrapped_event;

pub use async_actor::{AsyncActor, AsyncActorError};
pub use collector::{Collector, CollectorOutput};
pub use consumer_adaptor::Adaptor;
pub use either_error::EitherError;
pub use sync_actor::{SyncActor, SyncActorError};
pub use token_manager::TokenManager;

A src/util/consumer_adaptor.rs => src/util/consumer_adaptor.rs +40 -0
@@ 0,0 1,40 @@
//! Adapter for changing the type of a consumed value when registering a consumer

use async_trait::async_trait;

use crate::traits::{Event, EventConsumer};

/// Adaptor for registering a consumer
pub struct Adaptor<I: Event, O: Event, C: EventConsumer<O>> {
    /// The closure used to map the call
    closure: Box<dyn Fn(I) -> Option<O> + Send + Sync>,
    /// The wrapped consumer
    consumer: C,
}

impl<I: Event, O: Event, C: EventConsumer<O>> Adaptor<I, O, C> {
    /// Wrap another event consumer with the provided closure
    pub fn wrap(closure: impl Fn(I) -> Option<O> + Send + Sync + 'static, consumer: C) -> Self {
        Self {
            closure: Box::new(closure),
            consumer,
        }
    }
}

#[async_trait]
impl<I: Event, O: Event, C: EventConsumer<O>> EventConsumer<I> for Adaptor<I, O, C> {
    type Error = <C as EventConsumer<O>>::Error;
    async fn accept(&self, event: I) -> Result<(), Self::Error> {
        match (self.closure)(event) {
            Some(x) => self.consumer.accept(x).await,
            None => Ok(()),
        }
    }
    fn accept_sync(&self, event: I) -> Result<(), Self::Error> {
        match (self.closure)(event) {
            Some(x) => self.consumer.accept_sync(x),
            None => Ok(()),
        }
    }
}