~joshleeb/takoto

41769ecea09469314985685514ce1aaaf39afcdd — Josh Leeb-du Toit 27 days ago 7e29e74 master
Map RusotoError::Unknown response to specific Error variant

In this patch, we map a response with the
AWS.SimpleQueueService.NonExistentQueue error code to
Error::QueueNotFound.
3 files changed, 41 insertions(+), 23 deletions(-)

M takoto-sqs/Cargo.toml
M takoto-sqs/src/error.rs
M takoto-sqs/src/sqs.rs
M takoto-sqs/Cargo.toml => takoto-sqs/Cargo.toml +3 -1
@@ 9,9 9,11 @@ publish = false
async-trait = "0.1.41"
base64 = "0.13.0"
clock = "0.2.0"
http = "0.2.2"
rusoto_core = "0.45.0"
rusoto_sqs = "0.45.0"
serde = { version = "1.0.117", features = ["derive"] }
serde = { version = "1.0.118", features = ["derive"] }
serde-xml-rs = "0.4.0"
serde_json = "1.0.59"
tempfile = "3.1.0"
thiserror = "1.0.21"

M takoto-sqs/src/error.rs => takoto-sqs/src/error.rs +32 -16
@@ 1,5 1,6 @@
use rusoto_core::RusotoError;
use rusoto_sqs::{DeleteMessageError, ReceiveMessageError, SendMessageError};
use http::StatusCode;
use rusoto_core::{request::BufferedHttpResponse, RusotoError};
use serde::Deserialize;
use std::{error::Error as StdError, io, ops::RangeInclusive, result::Result as StdResult, string};
use thiserror::Error;



@@ 24,6 25,23 @@ pub enum Error {
    Other(Box<dyn StdError>),
}

impl Error {
    pub(crate) fn from_rusoto<T>(queue_url: &str, err: RusotoError<T>) -> Self
    where
        RusotoError<T>: Into<Box<dyn StdError>>,
    {
        if let RusotoError::Unknown(BufferedHttpResponse { status, body, .. }) = &err {
            let resp: AwsErrorResponse = serde_xml_rs::from_reader(body.as_ref()).unwrap();
            if status == &StatusCode::BAD_REQUEST
                && resp.error.code == "AWS.SimpleQueueService.NonExistentQueue"
            {
                return Self::QueueNotFound(queue_url.into());
            }
        }
        Error::Other(err.into())
    }
}

impl From<string::FromUtf8Error> for Error {
    fn from(value: string::FromUtf8Error) -> Self {
        Self::from(io::Error::new(io::ErrorKind::InvalidData, value))


@@ 54,20 72,18 @@ impl From<toml::de::Error> for Error {
    }
}

impl From<RusotoError<SendMessageError>> for Error {
    fn from(value: RusotoError<SendMessageError>) -> Self {
        Error::Other(value.into())
    }
}

impl From<RusotoError<ReceiveMessageError>> for Error {
    fn from(value: RusotoError<ReceiveMessageError>) -> Self {
        Error::Other(value.into())
    }
#[derive(Debug, Deserialize)]
#[serde(rename_all = "PascalCase")]
struct AwsErrorResponse {
    error: AwsError,
    request_id: String,
}

impl From<RusotoError<DeleteMessageError>> for Error {
    fn from(value: RusotoError<DeleteMessageError>) -> Self {
        Error::Other(value.into())
    }
#[derive(Debug, Deserialize)]
#[serde(rename_all = "PascalCase")]
struct AwsError {
    #[serde(rename = "Type")]
    kind: String,
    code: String,
    message: String,
}

M takoto-sqs/src/sqs.rs => takoto-sqs/src/sqs.rs +6 -6
@@ 1,4 1,4 @@
use crate::{check_message_len, Message, QueueClient};
use crate::{check_message_len, error::Error, Message, QueueClient};
use async_trait::async_trait;
use rusoto_sqs::{DeleteMessageRequest, ReceiveMessageRequest, SendMessageRequest, Sqs};
use std::fmt::{self, Debug};


@@ 19,11 19,9 @@ impl Debug for SqsQueueClient {
    }
}

// TODO: Ensure that `Error::QueueNotFound` when the queue is missing, rather than `Error::Other`.
#[async_trait]
impl QueueClient for SqsQueueClient {
    async fn send_message(&self, queue_url: &str, value: String) -> crate::Result<String> {
        // TODO: Handle when `MaximumMessageSize` is configured for a queue.
        check_message_len(&value)?;

        let req = SendMessageRequest {


@@ 31,13 29,15 @@ impl QueueClient for SqsQueueClient {
            message_body: value,
            ..Default::default()
        };
        let resp = self.sqs.send_message(req).await?;
        let resp =
            self.sqs.send_message(req).await.map_err(|e| Error::from_rusoto(queue_url, e))?;
        Ok(resp.message_id.expect("message id"))
    }

    async fn recv_message(&self, queue_url: &str) -> crate::Result<Option<Message>> {
        let req = ReceiveMessageRequest { queue_url: queue_url.into(), ..Default::default() };
        let resp = self.sqs.receive_message(req).await?;
        let resp =
            self.sqs.receive_message(req).await.map_err(|e| Error::from_rusoto(queue_url, e))?;
        if let Some(mut messages) = resp.messages {
            if !messages.is_empty() {
                return Ok(messages.pop().map(Message::from));


@@ 48,6 48,6 @@ impl QueueClient for SqsQueueClient {

    async fn delete_message(&self, queue_url: &str, receipt_handle: String) -> crate::Result<()> {
        let req = DeleteMessageRequest { queue_url: queue_url.into(), receipt_handle };
        self.sqs.delete_message(req).await.map_err(|e| e.into())
        self.sqs.delete_message(req).await.map_err(|e| Error::from_rusoto(queue_url, e))
    }
}