~joshleeb/takoto

9ccc601f550738ff96580e557df848405e759101 — Josh Leeb-du Toit 22 days ago 423e94e
Impl QueueClient for InMemoryQueueClient
3 files changed, 351 insertions(+), 16 deletions(-)

M takoto-sqs/src/in_memory.rs
A takoto-sqs/src/in_memory/client.rs
M takoto-sqs/src/lib.rs
M takoto-sqs/src/in_memory.rs => takoto-sqs/src/in_memory.rs +7 -15
@@ 1,19 1,11 @@
use crate::{Message, QueueClient};
use async_trait::async_trait;
pub use client::{InMemoryQueueClient, InMemoryQueueClientBuilder};

pub struct InMemoryQueueClient {}
use crate::{config::Config, item::Item};

#[async_trait]
impl QueueClient for InMemoryQueueClient {
    async fn send_message(&self, _queue_url: &str, _value: String) -> crate::Result<String> {
        todo!()
    }
mod client;

    async fn recv_message(&self, _queue_url: &str) -> crate::Result<Option<Message>> {
        todo!()
    }

    async fn delete_message(&self, _queue_url: &str, _receipt_handle: String) -> crate::Result<()> {
        todo!()
    }
#[derive(Debug, Default)]
struct Queue {
    pub items: Vec<Item>,
    pub config: Config,
}

A takoto-sqs/src/in_memory/client.rs => takoto-sqs/src/in_memory/client.rs +343 -0
@@ 0,0 1,343 @@
use crate::{
    check_message_len, config::Config, in_memory::Queue, item::Item, Error, Message, QueueClient,
};
use async_trait::async_trait;
use clock::Clock;
use std::{
    collections::HashMap,
    sync::{Arc, Mutex, MutexGuard},
};

#[derive(Clone)]
pub struct InMemoryQueueClientBuilder {
    clock: Arc<dyn Clock>,
    queue_configs: Vec<(String, Config)>,
}

impl InMemoryQueueClientBuilder {
    pub fn with_queue<S: ToString>(mut self, queue_url: S, config: Config) -> Self {
        self.queue_configs.push((queue_url.to_string(), config));
        self
    }

    pub fn build(self) -> InMemoryQueueClient {
        let queues = self.queue_configs.into_iter().map(|(url, config)| {
            let queue = Queue { config, ..Queue::default() };
            (url, (Mutex::new(queue)))
        });
        InMemoryQueueClient { clock: self.clock, queues: Arc::new(queues.collect()) }
    }
}

#[derive(Clone)]
pub struct InMemoryQueueClient {
    clock: Arc<dyn Clock>,
    queues: Arc<HashMap<String, Mutex<Queue>>>,
}

impl InMemoryQueueClient {
    pub fn builder(clock: Arc<dyn Clock>) -> InMemoryQueueClientBuilder {
        InMemoryQueueClientBuilder { clock, queue_configs: vec![] }
    }

    fn queue(&self, queue_url: &str) -> crate::Result<MutexGuard<Queue>> {
        let queue =
            self.queues.get(queue_url).ok_or_else(|| Error::QueueNotFound(queue_url.into()))?;
        Ok(queue.lock().unwrap())
    }
}

#[async_trait]
impl QueueClient for InMemoryQueueClient {
    async fn send_message(&self, queue_url: &str, value: String) -> crate::Result<String> {
        check_message_len(&value)?;
        let item = Item::new(value);
        let item_id = item.id.clone();

        let mut queue = self.queue(queue_url)?;
        queue.items.push(item);
        Ok(item_id)
    }

    async fn recv_message(&self, queue_url: &str) -> crate::Result<Option<Message>> {
        let mut claimed = None;
        let mut dlq_items = vec![];

        let mut queue = self.queue(queue_url)?;
        let mut index = 0;
        while index < queue.items.len() {
            // Item is in flight, so we should skip over it as presumably another process is
            // currently working with this item.
            if queue.items[index].is_visible(self.clock.now()) {
                index += 1;
                continue;
            }

            // The item has been received more than the maximum receive count, and so it should be
            // removed from the current queue (to send to the dlq).
            if queue.items[index].recv_count > queue.config.redrive.max_recv_count {
                let item = queue.items.remove(index);
                dlq_items.push(item);
                continue;
            }

            // On the first iteration where this stage it reached with an item, we should claim it
            // as this will be the item that is returned as a Message.
            if claimed.is_none() {
                let visibility_period = queue.config.visibility_period;
                queue.items[index].claim(self.clock.now(), visibility_period);
                claimed = Some(queue.items[index].clone());
            }
            index += 1;
        }

        // If there is a dlq specified in the `RedrivePolicy` for the queue, then send any
        // `dlq_items` to the dlq.
        if let Some(dlq_url) = &queue.config.redrive.dlq_url {
            let mut dlq = self.queue(dlq_url)?;
            dlq.items.extend(dlq_items.into_iter().map(Item::reset))
        }
        Ok(claimed.map(Message::from))
    }

    async fn delete_message(&self, queue_url: &str, receipt_handle: String) -> crate::Result<()> {
        let mut queue = self.queue(queue_url)?;
        queue.items.retain(|item| match &item.handle {
            Some(handle) => handle.id != receipt_handle,
            _ => true,
        });
        Ok(())
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::config::{Config, RedrivePolicy};
    use clock::FakeClock;
    use std::time::Duration;

    #[tokio::test]
    async fn message_body() -> crate::Result<()> {
        let clock = Arc::new(FakeClock::default());
        let queues = InMemoryQueueClient::builder(clock).with_queue("a", Config::default()).build();
        let message_id = queues.send_message("a", "foobar".into()).await?;

        let message = queues.recv_message("a").await?.unwrap();
        assert_eq!(message.id, message_id);
        assert_eq!(message.body, "foobar");
        Ok(())
    }

    #[tokio::test]
    async fn message_order() -> crate::Result<()> {
        let clock = Arc::new(FakeClock::default());
        let queues = InMemoryQueueClient::builder(clock).with_queue("a", Config::default()).build();
        let message_ids = vec![
            queues.send_message("a", "foobar".into()).await?,
            queues.send_message("a", "bazqux".into()).await?,
        ];

        for id in message_ids {
            let message = queues.recv_message("a").await?.unwrap();
            assert_eq!(message.id, id);

            queues.delete_message("a", message.receipt_handle).await?;
        }
        assert!(queues.recv_message("a").await?.is_none());
        Ok(())
    }

    #[tokio::test]
    async fn message_visibility() -> crate::Result<()> {
        let clock = Arc::new(FakeClock::default());
        let queues = InMemoryQueueClient::builder(clock).with_queue("a", Config::default()).build();
        let message_ids = vec![
            queues.send_message("a", "foobar".into()).await?,
            queues.send_message("a", "bazqux".into()).await?,
        ];

        for id in message_ids {
            let message = queues.recv_message("a").await?.unwrap();
            assert_eq!(message.id, id);
        }
        assert!(queues.recv_message("a").await?.is_none());
        Ok(())
    }

    #[tokio::test]
    async fn message_visibility_timeout() -> crate::Result<()> {
        let clock = Arc::new(FakeClock::default());
        let queues = InMemoryQueueClient::builder(Arc::clone(&clock) as Arc<dyn Clock>)
            .with_queue(
                "a",
                Config { visibility_period: Duration::from_secs(1), ..Default::default() },
            )
            .build();

        let message1_id = queues.send_message("a", "foobar".into()).await?;
        let message2_id = queues.send_message("a", "bazqux".into()).await?;

        let message1 = queues.recv_message("a").await?.unwrap();
        assert_eq!(message1.id, message1_id);

        clock.advance(Duration::from_secs(2));

        let message2 = queues.recv_message("a").await?.unwrap();
        assert_eq!(message2.id, message1_id);
        let message3 = queues.recv_message("a").await?.unwrap();
        assert_eq!(message3.id, message2_id);
        Ok(())
    }

    #[tokio::test]
    async fn dlq_messages() -> crate::Result<()> {
        let redrive = RedrivePolicy { max_recv_count: 1, dlq_url: Some("a-dlq".into()) };
        let clock = Arc::new(FakeClock::default());
        let queues = InMemoryQueueClient::builder(Arc::clone(&clock) as Arc<dyn Clock>)
            .with_queue("a", Config { redrive, visibility_period: Duration::from_secs(1) })
            .with_queue("a-dlq", Config::default())
            .build();

        let message_id = queues.send_message("a", "foobar".into()).await?;

        // Consume the recv_count for the message in the queue.
        for _ in 0..2 {
            let message = queues.recv_message("a").await?.unwrap();
            assert_eq!(message.id, message_id);
            clock.advance(Duration::from_secs(2));
        }

        // Now when we recv, the message should be send to the DLQ. So we should get nothing.
        assert!(queues.recv_message("a").await?.is_none());

        // Check the message has been sent to the DLQ.
        let dlq_message = queues.recv_message("a-dlq").await?.unwrap();
        assert_eq!(dlq_message.id, message_id);
        assert_eq!(dlq_message.body, "foobar");
        Ok(())
    }

    #[tokio::test]
    async fn recv_message_from_empty_queue() -> crate::Result<()> {
        let clock = Arc::new(FakeClock::default());
        let queues = InMemoryQueueClient::builder(clock).with_queue("a", Config::default()).build();

        let message = queues.recv_message("a").await.unwrap();
        assert!(message.is_none());
        Ok(())
    }

    #[tokio::test]
    async fn delete_message_missing_handle_id() -> crate::Result<()> {
        let clock = Arc::new(FakeClock::default());
        let queues = InMemoryQueueClient::builder(clock).with_queue("a", Config::default()).build();

        let res = queues.delete_message("a", "handle_id".into()).await;
        assert!(res.is_ok());
        Ok(())
    }

    #[tokio::test]
    async fn delete_message_after_visibility_period() -> crate::Result<()> {
        let clock = Arc::new(FakeClock::default());
        let queues = InMemoryQueueClient::builder(Arc::clone(&clock) as Arc<dyn Clock>)
            .with_queue("a", Config::default())
            .build();

        let message_id = queues.send_message("a", "foobar".into()).await?;
        let message = queues.recv_message("a").await?.unwrap();
        assert_eq!(message.id, message_id);

        // Even though the visibility period is over, since there have been no new `recv_message`
        // of the message, we should still be able to delete it from the queue.
        clock.advance(Duration::from_secs(2));
        queues.delete_message("a", message.receipt_handle).await?;
        assert!(queues.recv_message("a").await?.is_none());
        Ok(())
    }

    #[tokio::test]
    async fn delete_message_after_another_recv() -> crate::Result<()> {
        let clock = Arc::new(FakeClock::default());
        let queues = InMemoryQueueClient::builder(Arc::clone(&clock) as Arc<dyn Clock>)
            .with_queue(
                "a",
                Config { visibility_period: Duration::from_secs(1), ..Default::default() },
            )
            .build();

        let message_id = queues.send_message("a", "foobar".into()).await?;
        let message1 = queues.recv_message("a").await?.unwrap();
        assert_eq!(message1.id, message_id);

        clock.advance(Duration::from_secs(2));
        let message2 = queues.recv_message("a").await?.unwrap();
        assert_eq!(message2.id, message_id);

        queues.delete_message("a", message1.receipt_handle).await?;

        // Message shouldn't have been deleted since second `recv_message` updated the
        // `receipt_handle` to a new value.
        clock.advance(Duration::from_secs(2));
        let message3 = queues.recv_message("a").await?.unwrap();
        assert_eq!(message3.id, message_id);
        Ok(())
    }

    mod missing_queue {
        use super::*;

        #[tokio::test]
        async fn send_message() -> crate::Result<()> {
            let clock = Arc::new(FakeClock::default());
            let queues = InMemoryQueueClient::builder(clock).build();

            match queues.send_message("a", "foobar".into()).await {
                Err(Error::QueueNotFound(queue_url)) if queue_url == "a" => {}
                x => panic!("expected: Error::QueueNotFound, got: {:?}", x),
            }
            Ok(())
        }

        #[tokio::test]
        async fn recv_message() -> crate::Result<()> {
            let clock = Arc::new(FakeClock::default());
            let queues = InMemoryQueueClient::builder(clock).build();

            match queues.recv_message("a").await {
                Err(Error::QueueNotFound(queue_url)) if queue_url == "a" => {}
                x => panic!("expected: Error::QueueNotFound, got: {:?}", x),
            }
            Ok(())
        }

        #[tokio::test]
        async fn delete_message() -> crate::Result<()> {
            let clock = Arc::new(FakeClock::default());
            let queues = InMemoryQueueClient::builder(clock).build();

            match queues.delete_message("a", "handle_id".into()).await {
                Err(Error::QueueNotFound(queue_url)) if queue_url == "a" => {}
                x => panic!("expected: Error::QueueNotFound, got: {:?}", x),
            }
            Ok(())
        }
    }

    #[tokio::test]
    async fn queue_isolation() -> crate::Result<()> {
        let clock = Arc::new(FakeClock::default());
        let queues = InMemoryQueueClient::builder(clock)
            .with_queue("a", Config::default())
            .with_queue("b", Config::default())
            .build();

        let message_id = queues.send_message("a", "foobar".into()).await?;
        assert!(queues.recv_message("b").await?.is_none());

        let message = queues.recv_message("a").await?.unwrap();
        assert_eq!(message.id, message_id);
        Ok(())
    }
}

M takoto-sqs/src/lib.rs => takoto-sqs/src/lib.rs +1 -1
@@ 1,6 1,6 @@
pub use error::{Error, Result};
pub use file::FileQueueClient;
pub use in_memory::InMemoryQueueClient;
pub use in_memory::{InMemoryQueueClient, InMemoryQueueClientBuilder};
pub use sqs::SqsQueueClient;

use async_trait::async_trait;