~joshleeb/takoto

423e94e2d831466aeeb6ea918745dc7051f1fe0f — Josh Leeb-du Toit 22 days ago ab61c01
Impl QueueClient for SqsQueueClient
4 files changed, 78 insertions(+), 9 deletions(-)

M takoto-sqs/Cargo.toml
M takoto-sqs/src/error.rs
M takoto-sqs/src/lib.rs
M takoto-sqs/src/sqs.rs
M takoto-sqs/Cargo.toml => takoto-sqs/Cargo.toml +2 -0
@@ 9,6 9,8 @@ publish = false
async-trait = "0.1.41"
base64 = "0.13.0"
clock = "0.2.0"
rusoto_core = "0.45.0"
rusoto_sqs = "0.45.0"
serde = { version = "1.0.117", features = ["derive"] }
serde_json = "1.0.59"
tempfile = "3.1.0"

M takoto-sqs/src/error.rs => takoto-sqs/src/error.rs +24 -1
@@ 1,4 1,6 @@
use std::{io, ops::RangeInclusive, result::Result as StdResult, string};
use rusoto_core::RusotoError;
use rusoto_sqs::{DeleteMessageError, ReceiveMessageError, SendMessageError};
use std::{error::Error as StdError, io, ops::RangeInclusive, result::Result as StdResult, string};
use thiserror::Error;

pub type Result<T> = StdResult<T, Error>;


@@ 13,6 15,9 @@ pub enum Error {

    #[error(transparent)]
    Io(#[from] io::Error),

    #[error(transparent)]
    Other(Box<dyn StdError>),
}

impl From<string::FromUtf8Error> for Error {


@@ 44,3 49,21 @@ impl From<toml::de::Error> for Error {
        Self::from(io::Error::new(io::ErrorKind::Other, value))
    }
}

impl From<RusotoError<SendMessageError>> for Error {
    fn from(value: RusotoError<SendMessageError>) -> Self {
        Error::Other(value.into())
    }
}

impl From<RusotoError<ReceiveMessageError>> for Error {
    fn from(value: RusotoError<ReceiveMessageError>) -> Self {
        Error::Other(value.into())
    }
}

impl From<RusotoError<DeleteMessageError>> for Error {
    fn from(value: RusotoError<DeleteMessageError>) -> Self {
        Error::Other(value.into())
    }
}

M takoto-sqs/src/lib.rs => takoto-sqs/src/lib.rs +10 -0
@@ 51,6 51,16 @@ impl From<Item> for Message {
    }
}

impl From<rusoto_sqs::Message> for Message {
    fn from(value: rusoto_sqs::Message) -> Self {
        Self {
            id: value.message_id.expect("message id"),
            body: value.body.expect("message body"),
            receipt_handle: value.receipt_handle.expect("message receipt_handle"),
        }
    }
}

fn check_message_len(body: &str) -> Result<()> {
    if body.is_empty() || body.len() > MESSAGE_MAX_LEN_BYTES {
        return Err(Error::InvalidMessageLength {

M takoto-sqs/src/sqs.rs => takoto-sqs/src/sqs.rs +42 -8
@@ 1,19 1,53 @@
use crate::{Message, QueueClient};
use crate::{check_message_len, Message, QueueClient};
use async_trait::async_trait;
use rusoto_sqs::{DeleteMessageRequest, ReceiveMessageRequest, SendMessageRequest, Sqs};
use std::fmt::{self, Debug};

pub struct SqsQueueClient {}
pub struct SqsQueueClient {
    sqs: rusoto_sqs::SqsClient,
}

impl SqsQueueClient {
    pub fn new(sqs: rusoto_sqs::SqsClient) -> Self {
        Self { sqs }
    }
}

impl Debug for SqsQueueClient {
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
        write!(f, "SqsClient")
    }
}

// TODO: Ensure that `Error::QueueNotFound` when the queue is missing, rather than `Error::Other`.
#[async_trait]
impl QueueClient for SqsQueueClient {
    async fn send_message(&self, _queue_url: &str, _value: String) -> crate::Result<String> {
        todo!()
    async fn send_message(&self, queue_url: &str, value: String) -> crate::Result<String> {
        // TODO: Handle when `MaximumMessageSize` is configured for a queue.
        check_message_len(&value)?;

        let req = SendMessageRequest {
            queue_url: queue_url.into(),
            message_body: value,
            ..Default::default()
        };
        let resp = self.sqs.send_message(req).await?;
        Ok(resp.message_id.expect("message id"))
    }

    async fn recv_message(&self, _queue_url: &str) -> crate::Result<Option<Message>> {
        todo!()
    async fn recv_message(&self, queue_url: &str) -> crate::Result<Option<Message>> {
        let req = ReceiveMessageRequest { queue_url: queue_url.into(), ..Default::default() };
        let resp = self.sqs.receive_message(req).await?;
        if let Some(mut messages) = resp.messages {
            if !messages.is_empty() {
                return Ok(messages.pop().map(Message::from));
            }
        }
        Ok(None)
    }

    async fn delete_message(&self, _queue_url: &str, _receipt_handle: String) -> crate::Result<()> {
        todo!()
    async fn delete_message(&self, queue_url: &str, receipt_handle: String) -> crate::Result<()> {
        let req = DeleteMessageRequest { queue_url: queue_url.into(), receipt_handle };
        self.sqs.delete_message(req).await.map_err(|e| e.into())
    }
}