~whynothugo/vdirsyncer-rs

cd6c8c3b3b2615e73108c055e82a8ba4d357133e — Hugo Osvaldo Barrera 2 months ago 0931341
Implement synchronisation of collection properties
M vdirsyncer/src/main.rs => vdirsyncer/src/main.rs +6 -2
@@ 146,16 146,20 @@ impl<I: Item> NamedPair<I> {
        info!(">>> Plan for storage pair '{}'", self.name);
        for cp in &plan.collection_plans {
            info!(
                "collection: {}, action: {}. {} item actions.",
                "collection: {}, action: {}. {} item actions. {} property actions.",
                cp.alias(),
                cp.collection_action,
                cp.item_actions.len()
                cp.item_actions.len(),
                cp.property_actions.len(),
            );

            for item in &cp.item_actions {
                info!("item: {}", item);
                debug!("{item:?}");
            }
            for prop in &cp.property_actions {
                info!("property: {:?}", prop);
            }
        }
    }


M vstorage/src/base.rs => vstorage/src/base.rs +1 -6
@@ 608,13 608,8 @@ pub struct FetchedItem<I: Item> {
    pub etag: Etag,
}

pub enum PropertyTarget {
    Collection(Href),
    Item(Href),
}

pub struct ListedProperty<P: Property> {
    pub resource: PropertyTarget,
    pub resource: Href,
    pub property: P,
    pub value: String,
}

M vstorage/src/caldav.rs => vstorage/src/caldav.rs +2 -3
@@ 13,8 13,7 @@ use libdav::sd::BootstrapError;
use libdav::CalDavClient;

use crate::base::{
    CalendarProperty, Collection, FetchedItem, IcsItem, Item, ItemRef, ListedProperty,
    PropertyTarget, Storage,
    CalendarProperty, Collection, FetchedItem, IcsItem, Item, ItemRef, ListedProperty, Storage,
};
use crate::dav::{
    collection_href_for_item, collection_id_for_href, path_for_collection_in_home_set,


@@ 381,7 380,7 @@ where
            .zip(CalendarProperty::known_properties())
            .filter_map(|((_, v), p)| {
                v.map(|value| ListedProperty {
                    resource: PropertyTarget::Collection(collection_href.to_owned()),
                    resource: collection_href.to_owned(),
                    property: p.clone(),
                    value,
                })

M vstorage/src/carddav.rs => vstorage/src/carddav.rs +2 -3
@@ 12,8 12,7 @@ use libdav::dav::{mime_types, WebDavClient};
use libdav::CardDavClient;

use crate::base::{
    AddressBookProperty, Collection, FetchedItem, Item, ItemRef, ListedProperty, PropertyTarget,
    Storage, VcardItem,
    AddressBookProperty, Collection, FetchedItem, Item, ItemRef, ListedProperty, Storage, VcardItem,
};
use crate::dav::{
    collection_href_for_item, collection_id_for_href, path_for_collection_in_home_set,


@@ 369,7 368,7 @@ where
            .zip(AddressBookProperty::known_properties())
            .filter_map(|((_, v), p)| {
                v.map(|value| ListedProperty {
                    resource: PropertyTarget::Collection(collection_href.to_owned()),
                    resource: collection_href.to_owned(),
                    property: p.clone(),
                    value,
                })

M vstorage/src/lib.rs => vstorage/src/lib.rs +11 -0
@@ 70,6 70,17 @@
//!
//! See [`Item`](crate::base::Item).
//!
//! ## Properties
//!
//! Storages expose properties. Property types vary depending on a Storage's items. E.g.: Calendars
//! have a `Colour`, `Description`, `DisplayName` and `Order`, whereas Address Books have
//! `DisplayName` and `Description`. In both of these examples, only collections have properties,
//! and items have no properties.
//!
//! Synchronising Storages with custom `Item` types where items have properties is not yet
//! supported. This limitation means that an implementation trying to synchronise email will
//! synchronise messages but not their properties (e.g.: `Seen`, `Flagged`, etc).
//!
//! ## Entity tags
//!
//! An `Etag` is a value that changes whenever an item has changed in a collection. It is inspired

M vstorage/src/sync/error.rs => vstorage/src/sync/error.rs +14 -1
@@ 8,7 8,7 @@

use super::{
    execute::ExecutionError,
    plan::{CollectionAction, ItemAction},
    plan::{CollectionAction, ItemAction, PropertyAction},
};

/// An error synchronising two items between storages.


@@ 39,6 39,14 @@ impl SyncError {
            error,
        }
    }

    #[must_use]
    pub fn property(action: PropertyAction, error: ExecutionError) -> Self {
        Self {
            action: SomeAction::Property(action),
            error,
        }
    }
}

impl std::fmt::Display for SyncError {


@@ 62,6 70,8 @@ pub enum SomeAction {
        action: CollectionAction,
        alias: String,
    },
    // FIXME: missing details of property. Do I need Property::display ?
    Property(PropertyAction),
}

impl std::fmt::Display for SomeAction {


@@ 73,6 83,9 @@ impl std::fmt::Display for SomeAction {
            SomeAction::Collection { action, alias } => {
                write!(f, "collection action '{action}' for '{alias}'")
            }
            SomeAction::Property(action) => {
                write!(f, "property action '{action}'")
            }
        }
    }
}

M vstorage/src/sync/execute.rs => vstorage/src/sync/execute.rs +77 -2
@@ 7,14 7,14 @@
use log::{debug, error};

use crate::{
    base::{Item, ItemRef, Storage},
    base::{Item, ItemRef, Property, Storage},
    disco::DiscoveredCollection,
    CollectionId, Href,
};

use super::{
    error::SyncError,
    plan::{CollectionAction, CollectionPlan, ItemAction, Plan, ResolvedMapping},
    plan::{CollectionAction, CollectionPlan, ItemAction, Plan, PropertyPlan, ResolvedMapping},
    status::{ItemState, MappingUid, Side, StatusDatabase, StatusError},
};



@@ 239,6 239,7 @@ impl<I: Item> Plan<I> {
            let CollectionPlan {
                collection_action,
                item_actions,
                property_actions,
                mapping,
            } = plan;
            let ResolvedMapping { alias, a, b } = mapping;


@@ 264,6 265,16 @@ impl<I: Item> Plan<I> {
                };
            }

            for prop_action in property_actions {
                if let Err(err) = prop_action
                    // FIXME: won't work for item properties
                    .execute(storage_a, storage_b, status, &mapping_uid, &a.href, &b.href)
                    .await?
                {
                    on_error(SyncError::property(prop_action.action, err));
                };
            }

            match side_to_delete {
                None => {}
                Some(Side::A) => {


@@ 427,3 438,67 @@ async fn check_id_matches_expected<I: Item>(
    }
    Ok(())
}

impl<I: Item> PropertyPlan<I> {
    async fn execute(
        &self,
        a: &dyn Storage<I>,
        b: &dyn Storage<I>,
        status: &StatusDatabase,
        mapping_uid: &MappingUid,
        href_a: &str,
        href_b: &str,
    ) -> Result<Result<(), ExecutionError>, StatusError> {
        match &self.action {
            super::plan::PropertyAction::WriteToA { value } => {
                if let Err(err) = a.set_property(href_a, self.property.clone(), value).await {
                    return Ok(Err(ExecutionError::from(err)));
                };
                status.set_property(mapping_uid, href_a, href_b, &self.property.name(), value)?;
            }
            super::plan::PropertyAction::WriteToB { value } => {
                if let Err(err) = b.set_property(href_b, self.property.clone(), value).await {
                    return Ok(Err(ExecutionError::from(err)));
                };
                status.set_property(mapping_uid, href_a, href_b, &self.property.name(), value)?;
            }
            super::plan::PropertyAction::DeleteInA => {
                if let Err(err) = a.unset_property(href_a, self.property.clone()).await {
                    return Ok(Err(ExecutionError::from(err)));
                };
                status.delete_property(
                    mapping_uid,
                    href_a,
                    href_b,
                    self.property.name().as_str(),
                )?;
            }
            super::plan::PropertyAction::DeleteInB => {
                if let Err(err) = b.unset_property(href_b, self.property.clone()).await {
                    return Ok(Err(ExecutionError::from(err)));
                };
                status.delete_property(
                    mapping_uid,
                    href_a,
                    href_b,
                    self.property.name().as_str(),
                )?;
            }
            super::plan::PropertyAction::ClearStatus => {
                status.delete_property(
                    mapping_uid,
                    href_a,
                    href_b,
                    self.property.name().as_str(),
                )?;
            }
            super::plan::PropertyAction::UpdateStatus { value } => {
                status.set_property(mapping_uid, href_a, href_b, &self.property.name(), value)?;
            }
            super::plan::PropertyAction::Conflict => {
                error!("Conflict for property {}. Skipping.", self.property.name());
            }
        };
        Ok(Ok(()))
    }
}

M vstorage/src/sync/plan.rs => vstorage/src/sync/plan.rs +150 -7
@@ 11,7 11,7 @@ use std::sync::Arc;

use log::{debug, warn};

use crate::base::{FetchedItem, ItemRef, Storage};
use crate::base::{FetchedItem, ItemRef, Property, Storage};
use crate::disco::{DiscoveredCollection, Discovery};
use crate::{base::Item, sync::declare::StoragePair};
use crate::{CollectionId, ErrorKind, Href};


@@ 59,7 59,7 @@ pub enum PlanError {
pub struct Plan<I: Item> {
    pub(super) storage_a: Arc<dyn Storage<I>>,
    pub(super) storage_b: Arc<dyn Storage<I>>,
    pub collection_plans: Vec<CollectionPlan>,
    pub collection_plans: Vec<CollectionPlan<I>>,
}

/// Show details of the plan itself.


@@ 467,19 467,20 @@ fn resolve_mapping_counterpart<I: Item>(

/// Actions required to sync a collection between two storages.
#[derive(Debug)]
pub struct CollectionPlan {
pub struct CollectionPlan<I: Item> {
    pub collection_action: CollectionAction,
    pub item_actions: Vec<ItemAction>,
    pub property_actions: Vec<PropertyPlan<I>>,
    pub(super) mapping: ResolvedMapping,
}

impl CollectionPlan {
impl<I: Item> CollectionPlan<I> {
    /// Calculate actions to sync a collection between two storages.
    async fn new<I: Item>(
    async fn new(
        pair: &StoragePair<I>,
        mapping: ResolvedMapping,
        status: Option<&StatusDatabase>,
    ) -> Result<CollectionPlan, PlanError> {
    ) -> Result<CollectionPlan<I>, PlanError> {
        let (href_a, href_b) = (&mapping.a.href, &mapping.b.href);
        let mapping_uid = status
            .map(|s| s.get_mapping_uid(href_a, href_b))


@@ 520,11 521,27 @@ impl CollectionPlan {
            .collect::<Result<Vec<_>, PlanError>>()?;

        let collection_action =
            CollectionAction::new(mapping.a.exists, mapping.b.exists, mapping_uid);
            CollectionAction::new(mapping.a.exists, mapping.b.exists, mapping_uid.clone());

        // TODO: need to pass items_a and items_b to map Href->UID for item properties.
        let property_actions =
            match PropertyPlan::create_for_collection(pair, &mapping, status, mapping_uid).await {
                Ok(plan) => plan,
                // If a storage doesn't support properties, don't bail, simply no-op.
                Err(err) => 'unsupported: {
                    if let PlanError::Storage(e) = &err {
                        if e.kind == ErrorKind::Unsupported {
                            break 'unsupported Vec::<PropertyPlan<I>>::new();
                        }
                    }
                    return Err(err);
                }
            };

        Ok(CollectionPlan {
            collection_action,
            item_actions,
            property_actions,
            mapping,
        })
    }


@@ 845,3 862,129 @@ async fn items_for_collection<I: Item>(

    Ok(items)
}

#[derive(Debug)]
pub enum PropertyAction {
    WriteToA { value: String },
    WriteToB { value: String },
    DeleteInA,
    DeleteInB,
    ClearStatus,
    UpdateStatus { value: String },
    Conflict,
}

impl std::fmt::Display for PropertyAction {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        match self {
            PropertyAction::WriteToA { value } => write!(f, "Write to a: {value}"),
            PropertyAction::WriteToB { value } => write!(f, "Write to b: {value}"),
            PropertyAction::DeleteInA => write!(f, "Delete in a"),
            PropertyAction::DeleteInB => write!(f, "Delete in b"),
            PropertyAction::ClearStatus => write!(f, "Clear status"),
            PropertyAction::UpdateStatus { .. } => write!(f, "Update status"),
            PropertyAction::Conflict => write!(f, "Conflict"),
        }
    }
}

#[derive(Debug)]
pub struct PropertyPlan<I: Item> {
    pub(super) property: I::Property,
    pub(super) action: PropertyAction,
}

impl<I: Item> PropertyPlan<I> {
    async fn create_for_collection(
        pair: &StoragePair<I>,
        mapping: &ResolvedMapping,
        status: Option<&StatusDatabase>,
        uid: Option<MappingUid>,
    ) -> Result<Vec<PropertyPlan<I>>, PlanError> {
        let props_a = pair.storage_a().list_properties(&mapping.a.href).await?;
        let props_b = pair.storage_b().list_properties(&mapping.b.href).await?;

        let props_status = match (status, uid) {
            (Some(s), Some(u)) => s.list_properties_for_collection(&u)?,
            _ => Vec::new(),
        };

        // FIXME: assumes that no props belong to items.
        let all_props = props_a
            .iter()
            .chain(props_b.iter())
            .map(|p| &p.property)
            .collect::<HashSet<_>>(); // Collecting into HashSet removes duplicates.

        let mut actions = Vec::new();
        for property in all_props {
            // TODO: it is safe to POP from the vec and handle owned data.
            let a = props_a.iter().find(|p| p.property == *property);
            let b = props_b.iter().find(|p| p.property == *property);
            let state = props_status.iter().find(|p| p.property == property.name());

            if let Some(a) = a {
                if *a.resource != mapping.a.href {
                    todo!("Synchronising item properties is not implemented");
                }
            }
            if let Some(b) = b {
                if *b.resource != mapping.b.href {
                    todo!("Synchronising item properties is not implemented");
                }
            }

            let action = match (a, b, state) {
                (None, None, None) => None,
                (None, None, Some(_)) => Some(PropertyAction::ClearStatus),
                (None, Some(b), None) => Some(PropertyAction::WriteToA {
                    value: b.value.clone(),
                }),
                (None, Some(_), Some(_)) => Some(PropertyAction::DeleteInB {}),
                (Some(a), None, None) => Some(PropertyAction::WriteToB {
                    value: a.value.clone(),
                }),
                (Some(_), None, Some(_)) => Some(PropertyAction::DeleteInA {}),
                (Some(a), Some(b), None) => {
                    if a.value == b.value {
                        Some(PropertyAction::UpdateStatus {
                            value: a.value.clone(),
                        })
                    } else {
                        Some(PropertyAction::Conflict)
                    }
                }
                (Some(a), Some(b), Some(s)) => {
                    if a.value == b.value {
                        if s.value == a.value {
                            None
                        } else {
                            Some(PropertyAction::UpdateStatus {
                                value: a.value.clone(),
                            })
                        }
                    } else if a.value == s.value {
                        Some(PropertyAction::WriteToA {
                            value: b.value.clone(),
                        })
                    } else if b.value == s.value {
                        Some(PropertyAction::WriteToB {
                            value: a.value.clone(),
                        })
                    } else {
                        Some(PropertyAction::Conflict)
                    }
                }
            };

            if let Some(action) = action {
                actions.push(PropertyPlan {
                    action,
                    property: property.clone(), // TODO: don't clone
                });
            }
        }

        Ok(actions)
    }
}

M vstorage/src/sync/status.rs => vstorage/src/sync/status.rs +96 -1
@@ 78,6 78,16 @@ pub(super) struct StatusForItem {
    pub(super) href_b: String,
}

pub(super) struct PropertyStatus {
    #[allow(dead_code)] // TODO: will be required for item properties
    pub(super) href_a: String,
    #[allow(dead_code)] // TODO: will be required for item properties
    pub(super) href_b: String,
    // The name of the property
    pub(super) property: String,
    pub(super) value: String,
}

/// A unique ID used for a mapping between two collections.
///
/// This is an opaque identifier, and can only be obtained from a status database.


@@ 179,7 189,25 @@ impl StatusDatabase {
        self.conn
            .execute("CREATE UNIQUE INDEX IF NOT EXISTS by_href ON items(href_b)")?;

        // TODO: table for properties
        self.conn.execute(concat!(
            "CREATE TABLE IF NOT EXISTS properties (",
            " mapping_uid INTEGER NOT NULL,",
            " href_a TEXT NOT NULL,",
            " href_b TEXT NOT NULL,",
            " property TEXT NOT NULL,",
            " value TEXT NOT NULL,",
            " FOREIGN KEY(mapping_uid) REFERENCES collections(uid)",
            ")"
        ))?;
        self.conn.execute(
            "CREATE UNIQUE INDEX IF NOT EXISTS by_uid ON properties(mapping_uid, property)",
        )?;
        self.conn.execute(
            "CREATE UNIQUE INDEX IF NOT EXISTS by_href_a ON properties(href_a, property)",
        )?;
        self.conn.execute(
            "CREATE UNIQUE INDEX IF NOT EXISTS by_href_b ON properties(href_b, property)",
        )?;
        Ok(())
    }



@@ 382,6 410,73 @@ impl StatusDatabase {
        statement.next()?;
        Ok(())
    }

    pub(super) fn list_properties_for_collection(
        &self,
        mapping_uid: &MappingUid,
    ) -> Result<Vec<PropertyStatus>, StatusError> {
        let query = concat!(
            "SELECT href_a, href_b, property, value",
            " FROM properties",
            " WHERE mapping_uid = :mapping_uid"
        );
        let mut statement = self.conn.prepare(query)?;
        statement.bind((":mapping_uid", mapping_uid.0))?;

        let mut results = Vec::new();
        while let Ok(State::Row) = statement.next() {
            results.push(PropertyStatus {
                href_a: statement.read::<String, _>("href_a")?,
                href_b: statement.read::<String, _>("href_b")?,
                property: statement.read::<String, _>("property")?,
                value: statement.read::<String, _>("value")?,
            });
        }
        Ok(results)
    }

    pub(super) fn set_property(
        &self,
        mapping_uid: &MappingUid,
        href_a: &str,
        href_b: &str,
        property: &str,
        value: &str,
    ) -> Result<(), StatusError> {
        let query = concat!(
            "INSERT OR REPLACE INTO properties (mapping_uid, href_a, href_b, property, value) ",
            "VALUES (:mapping_uid, :href_a, :href_b, :property, :value)",
        );
        let mut statement = self.conn.prepare(query)?;
        statement.bind((":mapping_uid", mapping_uid.0))?;
        statement.bind((":href_a", href_a))?;
        statement.bind((":href_b", href_b))?;
        statement.bind((":property", property))?;
        statement.bind((":value", value))?;
        statement.next()?;
        Ok(())
    }

    pub(super) fn delete_property(
        &self,
        mapping_uid: &MappingUid,
        href_a: &str,
        href_b: &str,
        property: &str,
    ) -> Result<(), StatusError> {
        let query = concat!(
            "DELETE FROM properties",
            " WHERE mapping_uid = :mapping_uid AND href_a = :href_a AND href_b = :href_b",
            " AND property = :property",
        );
        let mut statement = self.conn.prepare(query)?;
        statement.bind((":mapping_uid", mapping_uid.0))?;
        statement.bind((":href_a", href_a))?;
        statement.bind((":href_b", href_b))?;
        statement.bind((":property", property))?;
        statement.next()?;
        Ok(())
    }
}

#[cfg(test)]

M vstorage/src/vdir.rs => vstorage/src/vdir.rs +2 -2
@@ 21,7 21,7 @@ use tokio::io::AsyncWriteExt;

use crate::base::{
    AddressBookProperty, CalendarProperty, Collection, FetchedItem, Item, ItemRef, ListedProperty,
    PropertyTarget, Storage,
    Storage,
};
use crate::disco::{DiscoveredCollection, Discovery};
use crate::{CollectionId, Error, ErrorKind, Etag, Href, Result};


@@ 287,7 287,7 @@ where
            let prop_value = self.get_property(collection_href, property.clone()).await?;
            if let Some(value) = prop_value {
                props.push(ListedProperty {
                    resource: PropertyTarget::Collection(collection_href.to_owned()),
                    resource: collection_href.to_owned(),
                    property: property.clone(),
                    value,
                });