~bitfehler/vomit-sync

9bc666b50feaa2ee20b83aee864eb81989978d10 — Conrad Hoffmann 8 months ago aa8464d
refactor: split state cache storage for subdirs

Currently, the subdir state, which is only present/needed in the root
directory, is part of the regular state cache. It is, however, quite
special and has a very different access pattern, and special
requirements on when changes should be made/persisted.

To facilitate state handling, split out this special root state into its
own file, `.vmtdirstate`, whic will only be present in the maildir root.
To avoid concurrency issues, the file will only be updated by a special
thread that collects results from the worker threads.

Split out this way, the new synchronization with the worker threads
ensures that the directory state is always updated at the right time,
regardless of updates to the regular state in the root directory.
Previously, updates to the directories were sometimes written
prematurely (due to changes in the regular state), which could then lead
to data loss if the current operation errored out or was aborted.

The change is sort of backwards compatible. Vsync will start adding
subdirectories to the new root state file as they get processed (i.e.
there are actual changes to process). However, the absence of a
subdirectory from the file only means the remote mailbox won't get
deleted if the local one does, so there is no hurry to add them. Once
the inbox is being processed, the regular state will be saved without
the `subdirs` entry.
4 files changed, 92 insertions(+), 123 deletions(-)

M resources/test/maildir/.vmtsyncstate
M src/lib.rs
M src/mailboxes.rs
M src/state.rs
M resources/test/maildir/.vmtsyncstate => resources/test/maildir/.vmtsyncstate +0 -1
@@ 3,7 3,6 @@ uid = 23
uid_validity = 42
highest_mod_seq = 555
localtime = 1676475686182
subdirs = ["dav", "lists.srht-dev"]

[uids]
"1672951180.M85393503P1063671V65024I8011842.serotonin,S=334513" = [23, ""]

M src/lib.rs => src/lib.rs +46 -16
@@ 28,6 28,7 @@ use std::fs;
use std::io;
use std::iter::Iterator;
use std::path::PathBuf;
use std::sync::mpsc;
use std::thread;
use std::time::Instant;
use thiserror::Error;


@@ 287,6 288,7 @@ fn worker_thread(
    opts: &SyncOptions,
    direction: SyncDirection,
    rx: spmc::Receiver<mailboxes::SyncJob>,
    tx: mpsc::Sender<mailboxes::SyncJob>,
) -> Result<(), SyncError> {
    let mut imap_session = new_session(opts)?;
    while let Ok(job) = rx.recv() {


@@ 295,15 297,32 @@ fn worker_thread(
            format!("Synced {}", job.name),
            sync_mailbox(&opts.local, &job, &direction, &mut imap_session)?
        );
        tx.send(job)?;
    }
    drop(tx);
    imap_session.logout()?;
    Ok(())
}

fn root_state_thread(
    root_dir: String,
    rx_results: mpsc::Receiver<mailboxes::SyncJob>,
) -> Result<(), SyncError> {
    trace!("root state thread managing state in {}", root_dir);
    let mut root_state = state::RootState::load(&root_dir)?;
    while let Ok(job) = rx_results.recv() {
        // update root state if needed
        trace!("root state update: {:?} {}", job.action, job.name);
        mailboxes::sync_done(job, &mut root_state);
        root_state.save()?;
    }
    Ok(())
}

fn sync_(opts: &SyncOptions, direction: SyncDirection) -> Result<(), SyncError> {
    info!("Syncing from {}", opts.remote);

    let mut root_state = state::RootState::load(&opts.local)?;
    let root_state = state::RootState::load(&opts.local)?;

    let mut imap_session = new_session(opts)?;



@@ 362,22 381,22 @@ fn sync_(opts: &SyncOptions, direction: SyncDirection) -> Result<(), SyncError> 
                match direction {
                    SyncDirection::Pull => {
                        // Delete locally, also from root state
                        mailboxes::sync_delete_local(name, delimiter, &mut root_state)
                        mailboxes::sync_delete_local(name, delimiter)
                    }
                    SyncDirection::Push => {
                        // Create remotely, add to root state
                        mailboxes::sync_create_remote(name, delimiter, &mut root_state)
                        mailboxes::sync_create_remote(name, delimiter)
                    }
                    SyncDirection::TwoWay => {
                        let dirname = Mailbox::virtual_to_dir(&name, &delimiter);
                        if root_state.subdirs.contains(&dirname) {
                            // Exists locally only because removed on server side
                            // Delete locally, also from root state
                            mailboxes::sync_delete_local(name, delimiter, &mut root_state)
                            mailboxes::sync_delete_local(name, delimiter)
                        } else {
                            // Exists locally only because created here
                            // Create remotely, add to root state
                            mailboxes::sync_create_remote(name, delimiter, &mut root_state)
                            mailboxes::sync_create_remote(name, delimiter)
                        }
                    }
                }


@@ 385,22 404,22 @@ fn sync_(opts: &SyncOptions, direction: SyncDirection) -> Result<(), SyncError> 
                match direction {
                    SyncDirection::Pull => {
                        // Create locally (already happening), add to root state (after sync?)
                        mailboxes::sync_create_local(name, delimiter, &mut root_state)
                        mailboxes::sync_create_local(name, delimiter)
                    }
                    SyncDirection::Push => {
                        // Delete remotely, also from root state
                        mailboxes::sync_delete_remote(name, delimiter, &mut root_state)
                        mailboxes::sync_delete_remote(name, delimiter)
                    }
                    SyncDirection::TwoWay => {
                        let dirname = Mailbox::virtual_to_dir(&name, &delimiter);
                        if root_state.subdirs.contains(&dirname) {
                            // Exists remotely only because removed locally
                            // Delete remotely, also from root state
                            mailboxes::sync_delete_remote(name, delimiter, &mut root_state)
                            mailboxes::sync_delete_remote(name, delimiter)
                        } else {
                            // Exists remotely only because created there
                            // Create locally (already happening), add to root state (after sync?)
                            mailboxes::sync_create_local(name, delimiter, &mut root_state)
                            mailboxes::sync_create_local(name, delimiter)
                        }
                    }
                }


@@ 443,7 462,6 @@ fn sync_(opts: &SyncOptions, direction: SyncDirection) -> Result<(), SyncError> 
        return Err(SyncError::DangerousActionError());
    }

    root_state.save()?;
    drop(root_state);

    let thread_count = if sync_jobs.len() < opts.threads.into() {


@@ 454,7 472,16 @@ fn sync_(opts: &SyncOptions, direction: SyncDirection) -> Result<(), SyncError> 
    };

    let mut threads = Vec::new();
    let (mut tx, rx) = spmc::channel::<mailboxes::SyncJob>();
    let (mut tx_work, rx_work) = spmc::channel::<mailboxes::SyncJob>();
    let (tx_result, rx_result) = mpsc::channel::<mailboxes::SyncJob>();

    let root_dir = opts.local.clone();
    let t = thread::spawn(move || {
        if let Err(e) = root_state_thread(root_dir, rx_result) {
            error!("Error in root state thread: {}", e);
        };
    });
    threads.push(t);

    info!(
        "Using {} threads to sync {} mailboxes",


@@ 462,29 489,32 @@ fn sync_(opts: &SyncOptions, direction: SyncDirection) -> Result<(), SyncError> 
        sync_jobs.len()
    );
    for i in 1..thread_count {
        let rx = rx.clone();
        let rx = rx_work.clone();
        let tx = tx_result.clone();
        let opts = (*opts).clone();
        let dir = direction.clone();

        let t = thread::spawn(move || {
            if let Err(e) = worker_thread(i, &opts, dir, rx) {
            if let Err(e) = worker_thread(i, &opts, dir, rx, tx) {
                error!("Error in worker thread {}: {}", i, e);
            };
        });
        threads.push(t);
    }
    for job in sync_jobs {
        tx.send(job)?;
        tx_work.send(job)?;
    }
    drop(tx);
    drop(tx_work);

    while let Ok(job) = rx.recv() {
    while let Ok(job) = rx_work.recv() {
        debug!("Syncing {} in main thread ({:?})", job.name, job.action);
        measure!(
            format!("Synced {}", job.name),
            sync_mailbox(&opts.local, &job, &direction, &mut imap_session)?
        );
        tx_result.send(job)?;
    }
    drop(tx_result);

    _ = imap_session.logout();


M src/mailboxes.rs => src/mailboxes.rs +13 -28
@@ 56,13 56,7 @@ pub fn load_remote(names: &ExtendedNames) -> BTreeMap<String, u64> {
        .collect()
}

pub(crate) fn sync_create_local(
    name: String,
    delimiter: String,
    state: &mut state::RootState,
) -> Option<SyncJob> {
    let dirname = vomit::Mailbox::virtual_to_dir(&name, &delimiter);
    state.subdirs.insert(dirname);
pub(crate) fn sync_create_local(name: String, delimiter: String) -> Option<SyncJob> {
    Some(SyncJob {
        name,
        delimiter,


@@ 70,13 64,7 @@ pub(crate) fn sync_create_local(
    })
}

pub(crate) fn sync_delete_local(
    name: String,
    delimiter: String,
    state: &mut state::RootState,
) -> Option<SyncJob> {
    let dirname = vomit::Mailbox::virtual_to_dir(&name, &delimiter);
    state.subdirs.remove(&dirname);
pub(crate) fn sync_delete_local(name: String, delimiter: String) -> Option<SyncJob> {
    Some(SyncJob {
        name,
        delimiter,


@@ 84,13 72,7 @@ pub(crate) fn sync_delete_local(
    })
}

pub(crate) fn sync_create_remote(
    name: String,
    delimiter: String,
    state: &mut state::RootState,
) -> Option<SyncJob> {
    let dirname = vomit::Mailbox::virtual_to_dir(&name, &delimiter);
    state.subdirs.insert(String::from(&dirname));
pub(crate) fn sync_create_remote(name: String, delimiter: String) -> Option<SyncJob> {
    Some(SyncJob {
        name,
        delimiter,


@@ 98,13 80,7 @@ pub(crate) fn sync_create_remote(
    })
}

pub(crate) fn sync_delete_remote(
    name: String,
    delimiter: String,
    state: &mut state::RootState,
) -> Option<SyncJob> {
    let dirname = vomit::Mailbox::virtual_to_dir(&name, &delimiter);
    state.subdirs.remove(&dirname);
pub(crate) fn sync_delete_remote(name: String, delimiter: String) -> Option<SyncJob> {
    Some(SyncJob {
        name,
        delimiter,


@@ 119,3 95,12 @@ pub fn sync(name: String, delimiter: String) -> Option<SyncJob> {
        action: SyncAction::Sync,
    })
}

pub(crate) fn sync_done(job: SyncJob, state: &mut state::RootState) {
    let dirname = vomit::Mailbox::virtual_to_dir(&job.name, &job.delimiter);
    match job.action {
        SyncAction::CreateLocal | SyncAction::CreateRemote => state.subdirs.insert(dirname),
        SyncAction::DeleteLocal | SyncAction::DeleteRemote => state.subdirs.remove(&dirname),
        SyncAction::Sync => state.subdirs.insert(dirname),
    };
}

M src/state.rs => src/state.rs +33 -78
@@ 2,7 2,6 @@ use chrono::prelude::*;
use maildir::Maildir;
use serde::{Deserialize, Serialize};
use std::collections::{BTreeMap, BTreeSet};
use std::ffi::OsStr;
use std::fs;
use std::io;
use std::path::{Path, PathBuf};


@@ 92,8 91,6 @@ pub(crate) struct LastSeen {
    /// The highest mod sequence seen for this mailbox
    pub highest_mod_seq: u64,
    pub localtime: Option<i64>,
    /// The known (maildir) subdirectories of this maildir
    pub subdirs: BTreeSet<String>,
}

// maildir id, flags


@@ 114,15 111,13 @@ pub(crate) struct SyncState {
    pub local_changes: LocalChanges,
}

/// Trimmed-down representation of [SyncState] for mailbox handling only
/// A state needed in the root of a maildir hierarchy
///
/// It gets loaded from and saved to the same file, but only exposes the
/// known subdirectories. This is enough to determine the mailboxes that
/// need to be synced but avoids some costly processing of the individual
/// emails' state during loading.
/// Keeps track of subfolders.
#[derive(Debug, Deserialize, Serialize)]
pub(crate) struct RootState {
    #[serde(skip)]
    dir: PathBuf,
    on_disk: SyncStateOnDisk,
    pub subdirs: BTreeSet<String>,
}



@@ 142,21 137,7 @@ impl SyncStateOnDisk {
    const FNAME: &str = ".vmtsyncstate";

    /// Create a new SyncStateOnDisk structure for when no state was present on-disk
    fn new(path: &impl AsRef<OsStr>) -> SyncStateOnDisk {
        let path: &OsStr = path.as_ref();
        // find all subdirs
        let maildir = Maildir::from(PathBuf::from(path));
        let submds: BTreeSet<String> = maildir
            .list_subdirs()
            .filter_map(|md| md.ok())
            .filter_map(|md| {
                md.path()
                    .file_name()
                    .and_then(|f| f.to_str().map(vomit::Mailbox::dir_to_name))
                    .map(|n| n.to_string())
            })
            .collect();

    fn new() -> SyncStateOnDisk {
        SyncStateOnDisk {
            uids: BTreeMap::new(),
            last_seen: LastSeen {


@@ 164,7 145,6 @@ impl SyncStateOnDisk {
                uid_validity: 0,
                highest_mod_seq: 0,
                localtime: Some(0),
                subdirs: submds,
            },
        }
    }


@@ 182,7 162,7 @@ impl SyncStateOnDisk {
                    return Err(StateError::IOError(e));
                }
                // If state file didn't exist, return empty state
                Ok(SyncStateOnDisk::new(&dir))
                Ok(SyncStateOnDisk::new())
            }
        }
    }


@@ 200,22 180,38 @@ impl SyncStateOnDisk {
}

impl RootState {
    /// Load the state of the given directory as [RootState]
    pub(crate) fn load(dir: &impl AsRef<Path>) -> Result<Self, StateError> {
        let ssod = SyncStateOnDisk::load(dir)?;
        let subdirs = ssod.last_seen.subdirs.clone();
    const FNAME: &str = ".vmtdirstate";

        Ok(RootState {
            dir: PathBuf::from(dir.as_ref()),
            on_disk: ssod,
            subdirs,
        })
    /// Load [RootState] from the given directory
    pub(crate) fn load(dir: &impl AsRef<Path>) -> Result<Self, StateError> {
        let dir: &Path = dir.as_ref();
        let name = Path::new(RootState::FNAME);
        let path: PathBuf = [dir, name].iter().collect();
        match fs::read_to_string(&path) {
            Ok(s) => {
                let mut r: Self = toml::from_str(&s)?;
                r.dir = path;
                Ok(r)
            }
            Err(e) => {
                if e.kind() != io::ErrorKind::NotFound {
                    eprintln!("error accessing state file {:?}", &path);
                    return Err(StateError::IOError(e));
                }
                // If state file didn't exist, return empty state
                Ok(RootState {
                    dir: path,
                    subdirs: BTreeSet::new(),
                })
            }
        }
    }

    /// Save the state back to disk
    pub fn save(&mut self) -> Result<(), StateError> {
        self.on_disk.last_seen.subdirs = self.subdirs.clone();
        self.on_disk.save(&self.dir)
        let toml = toml::to_string(self)?;
        fs::write(&self.dir, toml)?;
        Ok(())
    }
}



@@ 496,45 492,4 @@ mod tests {
        assert!(state2.has_local_changes());
        assert_eq!(state1.uids, state2.uids);
    }

    #[test]
    fn test_load_and_save_root_immutable() {
        let dir = tempdir().unwrap();
        setup(&dir);

        let mut state1 = RootState::load(&dir).unwrap();
        assert_eq!(state1.subdirs.len(), 2);
        state1.save().unwrap();

        let state2 = SyncState::load(&dir).unwrap();
        assert!(state2.has_local_changes());
        assert_eq!(state2.uids.len(), 1);
    }

    #[test]
    fn test_root_state() {
        let dir = tempdir().unwrap();
        setup(&dir);

        let mut root_state = RootState::load(&dir).unwrap();
        assert_eq!(root_state.subdirs.len(), 2);

        // Modify subdirs and save
        root_state.subdirs.pop_first().unwrap();
        root_state.save().unwrap();
        drop(root_state);

        // Reload
        let state = SyncState::load(&dir).unwrap();
        assert_eq!(state.last_seen.subdirs.len(), 1);

        // Assert nothing else changed
        assert_eq!(state.last_seen.uid, 23);
        assert_eq!(state.last_seen.uid_validity, 42);
        assert_eq!(state.last_seen.highest_mod_seq, 555);
        assert!(state.uids.contains_key(&23));
        assert_eq!(state.uids.len(), 1);

        dir.close().unwrap();
    }
}