~hime/aqua

e316d7aaf984e40ef8e14c05e63c7731e70d4a1c — Robbie Straw 7 years ago d426581
moving file type detection around

- Created a `util::processing` module which includes
  - mime type detection for videos (ffprobe)
  - mime type detection for images (hand rolled matchers)
  - image thumbnailing (`image` crate)
  - video thumbnailing (`video` crate)

Also I moved `hash_file` from the controllers prelude to `util::processing` in anticipation of a
later refactoring. Due to long compile times I'm thinking that "core" libraries (anything not reliant
on aqua-web/diesel) will be moved to a separate compilation unit.
M src/bin/aqua_watch.rs => src/bin/aqua_watch.rs +13 -231
@@ 22,7 22,6 @@
//

#[macro_use] extern crate log;
#[macro_use] extern crate serde_derive;

extern crate aqua;
extern crate clap;


@@ 34,99 33,22 @@ extern crate notify;
extern crate serde;
extern crate serde_json;

use aqua::controllers::prelude::hash_file;
use aqua::models::{Entry, NewEntry};
use aqua::schema;
use aqua::util::processing::{ProcessingError, ProcessingResult};
use clap::{Arg, App};
use diesel::prelude::*;
use diesel::pg::PgConnection;
use dotenv::dotenv;
use notify::{DebouncedEvent, Watcher, RecursiveMode, watcher};
use std::{env, fmt, process};
use std::collections::HashMap;
use std::convert::From;
use std::{env, fs};
use std::error::Error;
use std::fs::{self, File, OpenOptions};
use std::io::{self, Read, Write};
use std::fs::OpenOptions;
use std::io::Read;
use std::path::{Path, PathBuf};
use std::sync::mpsc::channel;
use std::time::Duration;

#[derive(Debug)]
enum ProcessingError {
    DigestFailed,
    DetectionFailed,
    ThumbnailFailed,

    DbConnErr(diesel::ConnectionError),
    DbQueryErr(diesel::result::Error),
    IoErr(io::Error),
    ImageErr(image::ImageError),
    Misc(Box<Error>),
}

impl Error for ProcessingError {
    fn description(&self) -> &str {
        match *self {
            // internal errors
            ProcessingError::DigestFailed      => "Unhandled error while generating SHA256 digest",
            ProcessingError::DetectionFailed   => "The file's type could not be detected",
            ProcessingError::ThumbnailFailed   => "The thumbnail could not be generated",

            // external errors
            ProcessingError::DbConnErr(ref inner)  => inner.description(),
            ProcessingError::DbQueryErr(ref inner) => inner.description(),
            ProcessingError::IoErr(ref inner)      => inner.description(),
            ProcessingError::ImageErr(ref inner)   => inner.description(),
            ProcessingError::Misc(ref inner)       => inner.description(),
        }
    }

    fn cause(&self) -> Option<&Error> {
        match *self {
            ProcessingError::DbConnErr(ref err)  => Some(err),
            ProcessingError::DbQueryErr(ref err) => Some(err),
            ProcessingError::IoErr(ref err)      => Some(err),
            ProcessingError::ImageErr(ref err)   => Some(err),
            ProcessingError::Misc(ref err)       => Some(err.as_ref()),
            _ => None,
        }
    }
}

impl fmt::Display for ProcessingError {
    fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
        match *self {
            // TODO: better display impl.
            _ => write!(f, "{}", self.description()),
        }
    }
}

impl From<diesel::ConnectionError> for ProcessingError {
    fn from(err: diesel::ConnectionError) -> Self { ProcessingError::DbConnErr(err) }
}

impl From<diesel::result::Error> for ProcessingError {
    fn from(err: diesel::result::Error) -> Self { ProcessingError::DbQueryErr(err) }
}

impl From<image::ImageError> for ProcessingError {
    fn from(err: image::ImageError) -> Self { ProcessingError::ImageErr(err) }
}

impl From<io::Error> for ProcessingError {
    fn from(err: io::Error) -> Self { ProcessingError::IoErr(err) }
}

impl From<serde_json::Error> for ProcessingError {
    fn from(err: serde_json::Error) -> Self { ProcessingError::Misc(Box::new(err)) }
}

impl From<std::string::FromUtf8Error> for ProcessingError {
    fn from(err: std::string::FromUtf8Error) -> Self { ProcessingError::Misc(Box::new(err)) }
}

fn main() {
    dotenv().expect("must provide .env file, see README (TODO: haha jk)");
    env_logger::init().expect("could not initialize console logging");


@@ 177,10 99,8 @@ fn main() {
}

// TODO: check that file doesn't exist before moving it ...
fn handle_new_file(path: PathBuf, content_store: &str) -> Result<(), ProcessingError> {
    let digest = hash_file(path.as_path())
        .ok_or(ProcessingError::DigestFailed)?;

fn handle_new_file(path: PathBuf, content_store: &str) -> ProcessingResult<()> {
    let digest = aqua::util::processing::hash_file(path.as_path())?;
    let mut file = OpenOptions::new()
        .read(true)
        .write(false)


@@ 192,18 112,18 @@ fn handle_new_file(path: PathBuf, content_store: &str) -> Result<(), ProcessingE
    file.read_to_end(&mut buf)?;

    // TODO: move_file() & db() is probably going to be common to all handlers?
    if let Some(image_metadata) = aqua::util::mime_detect(&buf) {
    if let Some(image_metadata) = aqua::util::processing::detect_image(&buf) {
        info!("got an image ...");
        process_image(content_store, &digest, &buf)?;
        aqua::util::processing::thumb_image(content_store, &digest, &buf)?;
        move_file(path.as_path(), content_store, &digest, image_metadata.extension())?;

        let db_entry = create_db_entry(&digest, image_metadata.mime())?;
        info!("inserted: {:?} into database", db_entry);

        Ok(())
    } else if let Some(ffmpeg_metadata) = ffmpeg_detect(path.as_path())? {
    } else if let Some(ffmpeg_metadata) = aqua::util::processing::detect_video(path.as_path())? {
        info!("got an video ...");
        process_video(content_store, &digest, &path)?;
        aqua::util::processing::thumb_video(content_store, &digest, &path)?;
        move_file(path.as_path(), content_store, &digest, ffmpeg_metadata.ext)?;

        let db_entry = create_db_entry(&digest, ffmpeg_metadata.mime)?;


@@ 215,87 135,7 @@ fn handle_new_file(path: PathBuf, content_store: &str) -> Result<(), ProcessingE
    }
}

#[derive(Debug, Serialize, Deserialize)]
struct FFProbeResult {
    format: Option<FFProbeFormat>,
    streams: Option<Vec<FFProbeStream>>,
}

#[derive(Debug, Serialize, Deserialize)]
struct FFProbeFormat {
    filename:    String,
    nb_streams:  i32,
    format_name: String,
    start_time:  String, // NOTE: these appear to be fixed decimal
    duration:    String, // NOTE: these appear to be fixed decimal
    size:        String, // NOTE: appears to be an integer
    bit_rate:    String, // NOTE: appears to be an integer
    probe_score: i32,    // NOTE: accuracy of the detection? (???)

    tags: HashMap<String, String>, // NOTE: not sure this is correct type
}

#[derive(Debug, Serialize, Deserialize)]
struct FFProbeStream {
    codec_name: String,
    codec_type: String,
}

struct FFProbeMeta {
    pub mime: &'static str,
    pub ext:  &'static str,
}

fn ffmpeg_detect(path: &Path) -> Result<Option<FFProbeMeta>, ProcessingError> {
    let ffprobe_cmd = process::Command::new("ffprobe")
        .arg("-v").arg("quiet")            // silence debug output
        .arg("-hide_banner")               // don't print ffmpeg configuration
        .arg("-print_format").arg("json") // serialize to json
        .arg("-show_format")              // display format data
        .arg("-show_streams")             // display stream data
        .arg("-i").arg(path.as_os_str())  // set the input to current file
        .output()?;

    let json_str = String::from_utf8(ffprobe_cmd.stdout)?;
    let probe_result: FFProbeResult = serde_json::from_str(&json_str)?;
    info!("got result: {:?}", probe_result);

    // see if ffprobe was able to determine the file type ...
    let probe_format = match probe_result.format {
        Some(format_data) => format_data,
        None => return Ok(None),
    };

    let probe_streams = match probe_result.streams {
        Some(stream_data) => stream_data,
        None => return Ok(None),
    };

    // welp ... guess there's nothing to thumbnail (!!!)
    info!("got format: {:?}", probe_format);
    info!("got streams: {:?}", probe_streams);

    let number_of_videos = probe_streams
        .iter()
        .filter(|el| el.codec_type == "video")
        .count();

    if number_of_videos <= 0 { return Err(ProcessingError::DetectionFailed) }

    // TODO: how do we correlate format_name w/ stream & stream position?
    // TODO: I believe this should be matching on containers (which is what will be moved
    //       to the content store; and therefore what will be played back ...)
    //      
    let meta_data = if probe_format.format_name.contains("matroska") {
        Some(FFProbeMeta { mime: "video/x-matroska", ext: "mkv" })
    } else if probe_format.format_name.contains("mp4") {
        Some(FFProbeMeta { mime: "video/mp4", ext: "mp4" })
    } else { None };
    
    Ok(meta_data)
}

fn establish_connection() -> Result<PgConnection, ProcessingError> {
fn establish_connection() -> ProcessingResult<PgConnection> {
    let database_url = env::var("DATABASE_URL")
        .expect("DATABASE_URL not set in `.env` file !!!");



@@ 303,7 143,7 @@ fn establish_connection() -> Result<PgConnection, ProcessingError> {
}

// create entry in database
fn create_db_entry(digest: &str, mime_ty: &str) -> Result<Entry, ProcessingError> {
fn create_db_entry(digest: &str, mime_ty: &str) -> ProcessingResult<Entry> {
    let pg_conn = establish_connection()?;
    let aqua_entry = NewEntry { hash: &digest, mime: Some(&mime_ty) };
    let entry: Result<Entry, diesel::result::Error> = diesel::insert(&aqua_entry)


@@ 314,7 154,7 @@ fn create_db_entry(digest: &str, mime_ty: &str) -> Result<Entry, ProcessingError
}

// moves the file from `src_path` to the `content_store` based on its digest
fn move_file(src_path: &Path, content_store: &str, digest: &str, file_ext: &str) -> Result<(), ProcessingError> {
fn move_file(src_path: &Path, content_store: &str, digest: &str, file_ext: &str) -> ProcessingResult<()> {
    // carve out a bucket based on first byte of SHA256 digest
    // create the bucket if it does not exist
    let file_bucket    = format!("f{}", &digest[0..2]);


@@ 332,61 172,3 @@ fn move_file(src_path: &Path, content_store: &str, digest: &str, file_ext: &str)
    // move the file 
    Ok(fs::rename(src_path, &dest)?)
}

// creates a thumbnail in the content store for the specified digest
// this expects an `ImageMeta` structure describing the input.
fn process_image(content_store: &str, digest: &str, buf: &[u8]) -> Result<(), ProcessingError> {
    // create in memory thumbnail
    let image = image::load_from_memory(&buf)?;

    let thumb = image.resize(200, 200, image::FilterType::Nearest);
    let thumb_bucket   = format!("t{}", &digest[0..2]);
    let thumb_filename = format!("{}.thumbnail", &digest);
    
    // store them in content store
    let dest = PathBuf::from(content_store)
        .join(thumb_bucket)
        .join(thumb_filename);

    // write thumbnail file to disk
    let bucket_dir = dest.parent().ok_or(ProcessingError::ThumbnailFailed)?;
    fs::create_dir_all(bucket_dir)?;
    let mut dest_file = File::create(&dest)?;
    thumb.save(&mut dest_file, image::ImageFormat::JPEG)?;
    Ok(dest_file.flush()?)
}

fn process_video(content_store: &str, digest: &str, src: &Path) -> Result<(), ProcessingError> {
    let thumb_bucket   = format!("t{}", &digest[0..2]);
    let thumb_filename = format!("{}.thumbnail", &digest);

    // store them in content store
    let dest = PathBuf::from(content_store)
        .join(thumb_bucket)
        .join(thumb_filename);

    // TODO: seems weird to have "mjpeg" in here... but I couldn't find any other
    //       JPEG muxer/encoder in my ffmpeg install ...
    //
    // write thumbnail file to disk
    let bucket_dir = dest.parent().ok_or(ProcessingError::ThumbnailFailed)?;
    fs::create_dir_all(bucket_dir)?;
    let ffmpeg_cmd = process::Command::new("ffmpeg")
        .arg("-i").arg(src.as_os_str())            // the input file
        .arg("-vf").arg("thumbnail,scale=200:200") // have ffmpeg seek for a "thumbnail"
        .arg("-frames:v").arg("1")                 // take a single frame
        .arg("-f").arg("mjpeg")                    // save it as jpeg
        .arg(dest.as_path().as_os_str())           // into the content store
        .output()?;

    debug!("ffmpeg stderr: {}", String::from_utf8_lossy(&ffmpeg_cmd.stderr));
    debug!("ffmpeg stdout: {}", String::from_utf8_lossy(&ffmpeg_cmd.stdout));

    info!("digest? {}", digest);
    info!("dest exists? {:?} => {}", dest, dest.is_file());

    match dest.is_file() {
        true  => Ok(()),
        false => Err(ProcessingError::ThumbnailFailed),
    }
}

M src/controllers/entries.rs => src/controllers/entries.rs +2 -2
@@ 117,7 117,7 @@ pub fn submit(conn: &mut plug::Conn) {
        .and_then(|form| extract_file(form, "upload"));

    let digest = file_upload.as_ref()
            .and_then(|file| hash_file(file.path.as_path()));
            .and_then(|file| util::processing::hash_file(file.path.as_path()).ok());

    // TODO: these are gross b/c we can't return anything, thus there's no good
    //       way to use Result+`try!` ...


@@ 162,7 162,7 @@ fn write_entry(conn: &mut plug::Conn, digest: String, file: SavedFile) {
    // read into temp buffer
    let mut buf = vec![];
    let file_ty = match file.read_to_end(&mut buf) {
        Ok(_size) => util::mime_detect(&buf[..]),
        Ok(_size) => util::processing::detect_image(&buf[..]),
        Err(_msg) => { conn.send_resp(500, "could not read your upload..."); return },
    };


M src/controllers/prelude.rs => src/controllers/prelude.rs +0 -23
@@ 1,15 1,9 @@
pub use std::io::Cursor;
pub use conduit::{Request, Response, WriteBody};

use std::borrow::Borrow;
use std::collections::HashMap;
use std::fs::File;
use std::io::Read;
use std::path::Path;

use aqua_web::mw::forms::{MultipartForm, FormField, SavedFile};
use crypto::digest::Digest;
use crypto::sha2::Sha256;

/// Send an `200 OK` response w/ mime: `TEXT/HTML`
pub fn respond_html<B>(body: B) -> Response 


@@ 29,20 23,3 @@ pub fn extract_file(form: &mut MultipartForm, field: &str) -> Option<SavedFile> 
        None    => { warn!("file expected, but not present"); None },
    }
}

pub fn hash_file(path: &Path) -> Option<String> {
    let mut buf = vec![];

    info!("path exists? {}",  (path.borrow()).exists());
    info!("path is file? {}", (path.borrow()).is_file());

    File::open(path)
         .and_then(|mut file| { file.read_to_end(&mut buf) })
         .map(|size| {

        debug!("read {} bytes into digest", size);
        let mut digest = Sha256::new();
        digest.input(&mut buf);
        digest.result_str()
    }).ok()
}

M src/util/mod.rs => src/util/mod.rs +1 -64
@@ 1,68 1,5 @@
use image::ImageFormat;

pub mod db;
pub mod processing;
pub mod template;
pub mod timer;
pub mod try_file;

#[derive(Copy, Clone)]
pub struct ImageMeta {
   ext: &'static str,
   fmt: ImageFormat,
}

impl ImageMeta {
    // NOTE: sorry, this table *must* be built at compile time.
    //       I don't feel like fighting the borrowck on this one.
    pub fn from(extension: &'static str, format: ImageFormat) -> ImageMeta {
        ImageMeta {
            ext: extension,
            fmt: format,
        }
    }

    /// Fetches this image's file extension
    pub fn extension(&self) -> &'static str { self.ext }

    /// Fetches this image's format enumeration
    pub fn format(&self) -> ImageFormat { self.fmt }

    /// Fetches the MIME type (for use w/ content-type)
    pub fn mime(&self) -> &'static str {
        match self.fmt {
            ImageFormat::BMP  => "image/bmp",
            ImageFormat::GIF => "image/gif",
            ImageFormat::HDR  => "image/x-hdr",
            ImageFormat::ICO  => "image/x-icon",
            ImageFormat::JPEG => "image/jpeg",
            ImageFormat::PNG => "image/png",
            ImageFormat::PPM => "image/x-portable-pixmap",
            ImageFormat::TGA => "image/tga",
            ImageFormat::TIFF => "image/tiff",
            ImageFormat::WEBP => "image/webp",
        }
    }
}

// TODO: moar formats, MOAR!
pub fn mime_detect(data: &[u8]) -> Option<ImageMeta> {
    // OFFSET   MATCHER             MIME_TYPE
    let mime_table: Vec<(usize, &'static [u8], ImageMeta)> = vec![
        (0,     &b"BM"[..],         ImageMeta::from("bmp",  ImageFormat::BMP)  ),
        (0,     &b"GIF87a"[..],     ImageMeta::from("gif",  ImageFormat::GIF)  ),
        (0,     &b"GIF89a"[..],     ImageMeta::from("gif",  ImageFormat::GIF)  ),
        (0,     &b"#?RADIANCE"[..], ImageMeta::from("hdr",  ImageFormat::HDR)  ),
        (0,     &b"\0\0\x01\0"[..], ImageMeta::from("ico",  ImageFormat::ICO)  ),
        (0,     &b"\xff\xd8"[..],   ImageMeta::from("jpg",  ImageFormat::JPEG) ),
        (0,     &b"\x89PNG"[..],    ImageMeta::from("png",  ImageFormat::PNG)  ),
        (0,     &b"MM.*"[..],       ImageMeta::from("tiff", ImageFormat::TIFF) ),
        (0,     &b"II*."[..],       ImageMeta::from("tiff", ImageFormat::TIFF) ),
    ];

    // see if file matches a header descriptor we know...
    for &(offset, matcher, file_ty) in &mime_table {
        if data[offset..].starts_with(matcher) { return Some(file_ty) }
    }

    None
}

A src/util/processing/image_detector.rs => src/util/processing/image_detector.rs +97 -0
@@ 0,0 1,97 @@
use image::{self, ImageFormat};
use std::fs::{self, File};
use std::io::Write;
use std::path::PathBuf;
use super::{ProcessingError, ProcessingResult};

/// ImageMeta stores mappings of common image filetypes to their associated
/// MIME type and typical file extension. This is useful in processing files 
/// which either have no filename, or files where the filename provided by the 
/// client is considered "untrusted."
///
/// At the moment image detection is done using a table of offsets & magic bytes.
/// Only formats supported by the `image` library are detected.
#[derive(Copy, Clone)]
pub struct ImageMeta {
   ext: &'static str,
   fmt: ImageFormat,
}

impl ImageMeta {
    // NOTE: sorry, this table *must* be built at compile time.
    //       I don't feel like fighting the borrowck on this one.
    pub fn from(extension: &'static str, format: ImageFormat) -> ImageMeta {
        ImageMeta {
            ext: extension,
            fmt: format,
        }
    }

    /// Fetches this image's file extension
    pub fn extension(&self) -> &'static str { self.ext }

    /// Fetches this image's format enumeration
    pub fn format(&self) -> ImageFormat { self.fmt }

    /// Fetches the MIME type (for use w/ content-type)
    pub fn mime(&self) -> &'static str {
        match self.fmt {
            ImageFormat::BMP  => "image/bmp",
            ImageFormat::GIF => "image/gif",
            ImageFormat::HDR  => "image/x-hdr",
            ImageFormat::ICO  => "image/x-icon",
            ImageFormat::JPEG => "image/jpeg",
            ImageFormat::PNG => "image/png",
            ImageFormat::PPM => "image/x-portable-pixmap",
            ImageFormat::TGA => "image/tga",
            ImageFormat::TIFF => "image/tiff",
            ImageFormat::WEBP => "image/webp",
        }
    }
}

// TODO: moar formats, MOAR!
pub fn mime_detect(data: &[u8]) -> Option<ImageMeta> {
    // OFFSET   MATCHER             MIME_TYPE
    let mime_table: Vec<(usize, &'static [u8], ImageMeta)> = vec![
        (0,     &b"BM"[..],         ImageMeta::from("bmp",  ImageFormat::BMP)  ),
        (0,     &b"GIF87a"[..],     ImageMeta::from("gif",  ImageFormat::GIF)  ),
        (0,     &b"GIF89a"[..],     ImageMeta::from("gif",  ImageFormat::GIF)  ),
        (0,     &b"#?RADIANCE"[..], ImageMeta::from("hdr",  ImageFormat::HDR)  ),
        (0,     &b"\0\0\x01\0"[..], ImageMeta::from("ico",  ImageFormat::ICO)  ),
        (0,     &b"\xff\xd8"[..],   ImageMeta::from("jpg",  ImageFormat::JPEG) ),
        (0,     &b"\x89PNG"[..],    ImageMeta::from("png",  ImageFormat::PNG)  ),
        (0,     &b"MM.*"[..],       ImageMeta::from("tiff", ImageFormat::TIFF) ),
        (0,     &b"II*."[..],       ImageMeta::from("tiff", ImageFormat::TIFF) ),
    ];

    // see if file matches a header descriptor we know...
    for &(offset, matcher, file_ty) in &mime_table {
        if data[offset..].starts_with(matcher) { return Some(file_ty) }
    }

    None
}

// creates a thumbnail in the content store for the specified digest
// this expects an `ImageMeta` structure describing the input.
pub fn process_image(content_store: &str, digest: &str, buf: &[u8]) -> ProcessingResult<()> {
    // create in memory thumbnail
    let image = image::load_from_memory(&buf)?;

    let thumb = image.resize(200, 200, image::FilterType::Nearest);
    let thumb_bucket   = format!("t{}", &digest[0..2]);
    let thumb_filename = format!("{}.thumbnail", &digest);
    
    // store them in content store
    let dest = PathBuf::from(content_store)
        .join(thumb_bucket)
        .join(thumb_filename);

    // write thumbnail file to disk
    let bucket_dir = dest.parent().ok_or(ProcessingError::ThumbnailFailed)?;
    fs::create_dir_all(bucket_dir)?;
    let mut dest_file = File::create(&dest)?;
    thumb.save(&mut dest_file, image::ImageFormat::JPEG)?;
    Ok(dest_file.flush()?)
}

A src/util/processing/mod.rs => src/util/processing/mod.rs +120 -0
@@ 0,0 1,120 @@
use crypto::digest::Digest;
use crypto::sha2::Sha256;
use diesel;
use image;
use serde_json;
use std::{self, fmt, io};
use std::borrow::Borrow;
use std::convert::From;
use std::error::Error;
use std::fs::File;
use std::io::Read;
use std::path::Path;

mod image_detector;
mod video_detector;

// public detection & thumbnailing exports
pub use self::image_detector::mime_detect   as detect_image;
pub use self::image_detector::process_image as thumb_image;
pub use self::video_detector::ffmpeg_detect as detect_video;
pub use self::video_detector::process_video as thumb_video;

/// Reads a file from the specified path and returns its SHA256 digest.
pub fn hash_file(path: &Path) -> ProcessingResult<String> {
    let mut buf = vec![];

    info!("path exists? {}",  (path.borrow()).exists());
    info!("path is file? {}", (path.borrow()).is_file());

    let digest = File::open(path)
         .and_then(|mut file| { file.read_to_end(&mut buf) })
         .map(|size| {

        debug!("read {} bytes into digest", size);
        let mut digest = Sha256::new();
        digest.input(&mut buf);
        digest.result_str()
    })?;

    Ok(digest)
}



pub type ProcessingResult<T> = Result<T, ProcessingError>;

#[derive(Debug)]
pub enum ProcessingError {
    DigestFailed,
    DetectionFailed,
    ThumbnailFailed,

    DbConnErr(diesel::ConnectionError),
    DbQueryErr(diesel::result::Error),
    IoErr(io::Error),
    ImageErr(image::ImageError),
    Misc(Box<Error>),
}

impl Error for ProcessingError {
    fn description(&self) -> &str {
        match *self {
            // internal errors
            ProcessingError::DigestFailed      => "Unhandled error while generating SHA256 digest",
            ProcessingError::DetectionFailed   => "The file's type could not be detected",
            ProcessingError::ThumbnailFailed   => "The thumbnail could not be generated",

            // external errors
            ProcessingError::DbConnErr(ref inner)  => inner.description(),
            ProcessingError::DbQueryErr(ref inner) => inner.description(),
            ProcessingError::IoErr(ref inner)      => inner.description(),
            ProcessingError::ImageErr(ref inner)   => inner.description(),
            ProcessingError::Misc(ref inner)       => inner.description(),
        }
    }

    fn cause(&self) -> Option<&Error> {
        match *self {
            ProcessingError::DbConnErr(ref err)  => Some(err),
            ProcessingError::DbQueryErr(ref err) => Some(err),
            ProcessingError::IoErr(ref err)      => Some(err),
            ProcessingError::ImageErr(ref err)   => Some(err),
            ProcessingError::Misc(ref err)       => Some(err.as_ref()),
            _ => None,
        }
    }
}

impl fmt::Display for ProcessingError {
    fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
        match *self {
            // TODO: better display impl.
            _ => write!(f, "{}", self.description()),
        }
    }
}

impl From<diesel::ConnectionError> for ProcessingError {
    fn from(err: diesel::ConnectionError) -> Self { ProcessingError::DbConnErr(err) }
}

impl From<diesel::result::Error> for ProcessingError {
    fn from(err: diesel::result::Error) -> Self { ProcessingError::DbQueryErr(err) }
}

impl From<image::ImageError> for ProcessingError {
    fn from(err: image::ImageError) -> Self { ProcessingError::ImageErr(err) }
}

impl From<io::Error> for ProcessingError {
    fn from(err: io::Error) -> Self { ProcessingError::IoErr(err) }
}

impl From<serde_json::Error> for ProcessingError {
    fn from(err: serde_json::Error) -> Self { ProcessingError::Misc(Box::new(err)) }
}

impl From<std::string::FromUtf8Error> for ProcessingError {
    fn from(err: std::string::FromUtf8Error) -> Self { ProcessingError::Misc(Box::new(err)) }
}

A src/util/processing/video_detector.rs => src/util/processing/video_detector.rs +128 -0
@@ 0,0 1,128 @@
use serde_json;
use std::{fs, process};
use std::collections::HashMap;
use std::path::{Path, PathBuf};
use super::ProcessingError;

#[derive(Debug, Serialize, Deserialize)]
pub struct FFProbeResult {
    format: Option<FFProbeFormat>,
    streams: Option<Vec<FFProbeStream>>,
}

#[derive(Debug, Serialize, Deserialize)]
pub struct FFProbeFormat {
    filename:    String,
    nb_streams:  i32,
    format_name: String,
    start_time:  String, // NOTE: these appear to be fixed decimal
    duration:    String, // NOTE: these appear to be fixed decimal
    size:        String, // NOTE: appears to be an integer
    bit_rate:    String, // NOTE: appears to be an integer
    probe_score: i32,    // NOTE: accuracy of the detection? (???)

    tags: HashMap<String, String>, // NOTE: not sure this is correct type
}

#[derive(Debug, Serialize, Deserialize)]
pub struct FFProbeStream {
    codec_name: String,
    codec_type: String,
}

#[derive(Debug)]
pub struct FFProbeMeta {
    pub mime: &'static str,
    pub ext:  &'static str,
}

/// This function uses the system installation of `ffprobe` to detect the following:
///   - Does the file have (at least) one video stream?
///   - Which container format was detected?
///
/// The container format is then mapped to a common mime & extension which is used
/// by other parts of the `aqua` application suite to determine how an asset should
/// be displayed.
pub fn ffmpeg_detect(path: &Path) -> Result<Option<FFProbeMeta>, ProcessingError> {
    let ffprobe_cmd = process::Command::new("ffprobe")
        .arg("-v").arg("quiet")            // silence debug output
        .arg("-hide_banner")               // don't print ffmpeg configuration
        .arg("-print_format").arg("json") // serialize to json
        .arg("-show_format")              // display format data
        .arg("-show_streams")             // display stream data
        .arg("-i").arg(path.as_os_str())  // set the input to current file
        .output()?;

    let json_str = String::from_utf8(ffprobe_cmd.stdout)?;
    let probe_result: FFProbeResult = serde_json::from_str(&json_str)?;
    info!("got result: {:?}", probe_result);

    // see if ffprobe was able to determine the file type ...
    let probe_format = match probe_result.format {
        Some(format_data) => format_data,
        None => return Ok(None),
    };

    let probe_streams = match probe_result.streams {
        Some(stream_data) => stream_data,
        None => return Ok(None),
    };

    // welp ... guess there's nothing to thumbnail (!!!)
    info!("got format: {:?}", probe_format);
    info!("got streams: {:?}", probe_streams);

    let number_of_videos = probe_streams
        .iter()
        .filter(|el| el.codec_type == "video")
        .count();

    if number_of_videos <= 0 { return Err(ProcessingError::DetectionFailed) }

    // TODO: how do we correlate format_name w/ stream & stream position?
    // TODO: I believe this should be matching on containers (which is what will be moved
    //       to the content store; and therefore what will be played back ...)
    //      
    let meta_data = if probe_format.format_name.contains("matroska") {
        Some(FFProbeMeta { mime: "video/x-matroska", ext: "mkv" })
    } else if probe_format.format_name.contains("mp4") {
        Some(FFProbeMeta { mime: "video/mp4", ext: "mp4" })
    } else { None };
    
    Ok(meta_data)
}

pub fn process_video(content_store: &str, digest: &str, src: &Path) -> super::ProcessingResult<()> {
    let thumb_bucket   = format!("t{}", &digest[0..2]);
    let thumb_filename = format!("{}.thumbnail", &digest);

    // store them in content store
    let dest = PathBuf::from(content_store)
        .join(thumb_bucket)
        .join(thumb_filename);

    // TODO: seems weird to have "mjpeg" in here... but I couldn't find any other
    //       JPEG muxer/encoder in my ffmpeg install ...
    //
    // write thumbnail file to disk
    let bucket_dir = dest.parent().ok_or(ProcessingError::ThumbnailFailed)?;
    fs::create_dir_all(bucket_dir)?;
    let ffmpeg_cmd = process::Command::new("ffmpeg")
        .arg("-i").arg(src.as_os_str())            // the input file
        .arg("-vf").arg("thumbnail,scale=200:200") // have ffmpeg seek for a "thumbnail"
        .arg("-frames:v").arg("1")                 // take a single frame
        .arg("-f").arg("mjpeg")                    // save it as jpeg
        .arg(dest.as_path().as_os_str())           // into the content store
        .output()?;

    debug!("ffmpeg stderr: {}", String::from_utf8_lossy(&ffmpeg_cmd.stderr));
    debug!("ffmpeg stdout: {}", String::from_utf8_lossy(&ffmpeg_cmd.stdout));

    info!("digest? {}", digest);
    info!("dest exists? {:?} => {}", dest, dest.is_file());

    match dest.is_file() {
        true  => Ok(()),
        false => Err(ProcessingError::ThumbnailFailed),
    }
}