~hime/aqua

033c98982a7a96ce42125f6a1f8b2f669aa07ff5 — Robbie Straw 7 years ago ed56040
way better error handling for aqua-watch
1 files changed, 128 insertions(+), 59 deletions(-)

M src/bin/aqua_watch.rs
M src/bin/aqua_watch.rs => src/bin/aqua_watch.rs +128 -59
@@ 39,13 39,78 @@ use diesel::prelude::*;
use diesel::pg::PgConnection;
use dotenv::dotenv;
use notify::{DebouncedEvent, Watcher, RecursiveMode, watcher};
use std::env;
use std::{env, fmt};
use std::convert::From;
use std::error::Error;
use std::fs::{self, File, OpenOptions};
use std::io::{Read, Write};
use std::path::PathBuf;
use std::io::{self, Read, Write};
use std::path::{Path, PathBuf};
use std::sync::mpsc::channel;
use std::time::Duration;

#[derive(Debug)]
enum ProcessingError {
    DigestFailed,
    DetectionFailed,
    ThumbnailFailed,
    DbConnErr(diesel::ConnectionError),
    DbQueryErr(diesel::result::Error),
    IoErr(io::Error),
    ImageErr(image::ImageError),
}

impl Error for ProcessingError {
    fn description(&self) -> &str {
        match *self {
            // internal errors
            ProcessingError::DigestFailed      => "Unhandled error while generating SHA256 digest",
            ProcessingError::DetectionFailed   => "The file's type could not be detected",
            ProcessingError::ThumbnailFailed   => "The thumbnail could not be generated",

            // external errors
            ProcessingError::DbConnErr(ref inner)  => inner.description(),
            ProcessingError::DbQueryErr(ref inner) => inner.description(),
            ProcessingError::IoErr(ref inner)      => inner.description(),
            ProcessingError::ImageErr(ref inner)   => inner.description(),
        }
    }

    fn cause(&self) -> Option<&Error> {
        match *self {
            ProcessingError::DbConnErr(ref err)  => Some(err),
            ProcessingError::DbQueryErr(ref err) => Some(err),
            ProcessingError::IoErr(ref err)      => Some(err),
            ProcessingError::ImageErr(ref err)   => Some(err),
            _ => None,
        }
    }
}

impl fmt::Display for ProcessingError {
    fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
        match *self {
            // TODO: better display impl.
            _ => write!(f, "{}", self.description()),
        }
    }
}

impl From<diesel::ConnectionError> for ProcessingError {
    fn from(err: diesel::ConnectionError) -> Self { ProcessingError::DbConnErr(err) }
}

impl From<diesel::result::Error> for ProcessingError {
    fn from(err: diesel::result::Error) -> Self { ProcessingError::DbQueryErr(err) }
}

impl From<image::ImageError> for ProcessingError {
    fn from(err: image::ImageError) -> Self { ProcessingError::ImageErr(err) }
}

impl From<io::Error> for ProcessingError {
    fn from(err: io::Error) -> Self { ProcessingError::IoErr(err) }
}

fn main() {
    dotenv().expect("must provide .env file, see README (TODO: haha jk)");
    env_logger::init().expect("could not initialize console logging");


@@ 81,7 146,12 @@ fn main() {
    loop {
        match fs_rx.recv() {
            Ok(DebouncedEvent::Create(path)) => {
                if path.is_file() { handle_new_file(path, content_store) }
                if path.is_file() { 
                    match handle_new_file(path, content_store) {
                        Ok(_res) => info!("file processed successfully ..."),
                        Err(msg) => warn!("could not process file: {}", msg.description()),
                    };
                }
                else { info!("directory created, ignoring ..."); }
            },
            Ok(event) => info!("unhandled evt: {:?}", event),


@@ 90,97 160,96 @@ fn main() {
    }
}

fn handle_new_file(path: PathBuf, content_store: &str) {
fn handle_new_file(path: PathBuf, content_store: &str) -> Result<(), ProcessingError> {
    let digest = hash_file(path.as_path())
        .expect("could not get digest for file (!!!)");
        .ok_or(ProcessingError::DigestFailed)?;

    let mut file = OpenOptions::new()
        .read(true)
        .write(false)
        .create_new(false)
        .open(path.as_path())
        .expect("could not open image file");
        .open(path.as_path())?;

    // read file into memory
    let mut buf = vec![];
    file.read_to_end(&mut buf).expect("could not read file");
    file.read_to_end(&mut buf)?;

    if let Some(image_metadata) = aqua::util::mime_detect(&buf) {
        info!("got an image ...");
        process_image(content_store, &digest, &buf)?;
        move_file(path.as_path(), content_store, &digest, image_metadata.extension())?;

    let (mime_ty, file_ext) = match aqua::util::mime_detect(&buf) {
        Some(file_ty) => {
            process_image(content_store, &digest, &buf);
            (file_ty.mime().to_string(), file_ty.extension().to_string())
        },
        let db_entry = create_db_entry(&digest, image_metadata.mime())?;
        info!("inserted: {:?} into database", db_entry);

        None => {
            // TODO: thumbnail a video, properly figure out mimety
            let file_extension = path.extension()
                .unwrap()
                .to_string_lossy();

            (format!("video/{}", file_extension), file_extension.into_owned())
            
        },
    };
        Ok(())
    } else if let Some(_nil) = ffmpeg_detect(path.as_path()) {
        unreachable!()
    } else {
        Err(ProcessingError::DetectionFailed)
    }
}

    // carve out a bucket based on first byte of SHA256 digest
    // create the bucket if it does not exist
    let file_bucket    = format!("f{}", &digest[0..2]);
    let file_filename  = format!("{}.{}", &digest, file_ext);
fn ffmpeg_detect(path: &Path) -> Option<()> {
    None
}

    // move file to content store
    let dest = PathBuf::from(content_store)
        .join(file_bucket)
        .join(file_filename);
fn establish_connection() -> Result<PgConnection, ProcessingError> {
    let database_url = env::var("DATABASE_URL")
        .expect("DATABASE_URL not set in `.env` file !!!");

    fs::create_dir_all(dest.parent().unwrap()).expect("could not create file bucket");   
    fs::rename(path, dest)
        .expect("could not move file to content store");
    Ok(PgConnection::establish(&database_url)?)
}

    // create entry in database
    let pg_conn = establish_connection();
// create entry in database
fn create_db_entry(digest: &str, mime_ty: &str) -> Result<Entry, ProcessingError> {
    let pg_conn = establish_connection()?;
    let aqua_entry = NewEntry { hash: &digest, mime: Some(&mime_ty) };
    let entry: Result<Entry, diesel::result::Error> = diesel::insert(&aqua_entry)
        .into(schema::entries::table)
        .get_result(&pg_conn);

    match entry {
        Ok(_entry) => info!("entry added to database: {}", digest),
        Err(msg) => warn!("could not store entry in database: {}", msg),
    };
    Ok(entry?)
}

pub fn establish_connection() -> PgConnection {
    let database_url = env::var("DATABASE_URL")
        .expect("DATABASE_URL not set");
// moves the file from `src_path` to the `content_store` based on its digest
fn move_file(src_path: &Path, content_store: &str, digest: &str, file_ext: &str) -> Result<(), ProcessingError> {
    // carve out a bucket based on first byte of SHA256 digest
    // create the bucket if it does not exist
    let file_bucket    = format!("f{}", &digest[0..2]);
    let file_filename  = format!("{}.{}", &digest, file_ext);

    PgConnection::establish(&database_url)
        .expect(&format!("Error connecting to {}", database_url))
    // create destination path
    let dest = PathBuf::from(content_store)
        .join(file_bucket)
        .join(file_filename);

    // TODO: bad error type ... 
    let bucket_dir = dest.parent().ok_or(ProcessingError::ThumbnailFailed)?;
    fs::create_dir_all(bucket_dir)?;

    // move the file 
    Ok(fs::rename(src_path, &dest)?)
}

// creates a thumbnail in the content store for the specified digest
// this expects an `ImageMeta` structure describing the input.
fn process_image(content_store: &str, digest: &str, buf: &[u8]) {
fn process_image(content_store: &str, digest: &str, buf: &[u8]) -> Result<(), ProcessingError> {
    // create in memory thumbnail
    let image = image::load_from_memory(&buf)
        .expect("could not read image into memory");
    let image = image::load_from_memory(&buf)?;

    let thumb = image.resize(200, 200, image::FilterType::Nearest);
    let thumb_bucket   = format!("t{}", &digest[0..2]);
    let thumb_filename = format!("{}.thumbnail", &digest);
    
    // store them in content store

    let dest = PathBuf::from(content_store)
        .join(thumb_bucket)
        .join(thumb_filename);

    // write thumbnail file
    fs::create_dir_all(dest.parent().unwrap()).expect("could not create thumbnail bucket");
    let mut dest_file = File::create(dest)
        .expect("could not create thumbnail in content store");

    thumb.save(&mut dest_file, image::ImageFormat::JPEG)
        .expect("could not write to thumbnail in content store"); 

    dest_file.flush().expect("could not flush thumbnail to disk");
    // write thumbnail file to disk
    fs::create_dir_all(dest.parent().unwrap())?;
    let mut dest_file = File::create(dest)?;
    thumb.save(&mut dest_file, image::ImageFormat::JPEG)?;
    Ok(dest_file.flush()?)
}