~mht/cmr

25287db86c9f9ae0277d4ca723a902729f7a7115 — Martin Hafskjold Thoresen 3 years ago 37c55e5
Fix the bug!

The bug was that in `remove` we gave it to crossbeam before really removing it
from the list. This was fine in CMR, since we did reachability from the head.

(nodes could be marked as deleted in the list, but not really removed)
2 files changed, 113 insertions(+), 63 deletions(-)

M crossbeam-skiporder/src/hashmap.rs
M crossbeam-skiporder/src/list.rs
M crossbeam-skiporder/src/hashmap.rs => crossbeam-skiporder/src/hashmap.rs +10 -3
@@ 99,12 99,14 @@ where

        let new_node =
            Owned::new(list::Node::new(Entry::Sentinel(sentinel_hash))).into_shared(g);
        ::std::sync::atomic::fence(SeqCst); // TODO(remove)

        'restart: loop {
            let mut entry = list::Entry::from_node_ptr(parent);
            match entry.seek_with(|other| sentinel_hash <= other.hash(), g) {
                Ok(_) => unsafe {
                    let curr = entry.current.deref();
                Ok(_) => {
                    let curr = unsafe { entry.current.deref() };
                    // Someone else inserted the same sentinel before us. Help out by casing the bucket ptr.
                    if curr.data().is_sentinel() && curr.data().hash() == sentinel_hash {
                        let _ = self.buckets[bucket].compare_and_set(
                            Shared::null(),


@@ 112,6 114,8 @@ where
                            SeqCst,
                            g,
                        );
                        // Drop our node, since it was never inserted into the list at all.
                        unsafe { new_node.into_owned() };
                        return;
                    }
                    if entry.insert_between(new_node, g).is_ok() {


@@ 133,7 137,10 @@ where
                _other => panic!("hashmap::insert_sentinel unreachable"),
            }
        }
        let _ = self.buckets[bucket].compare_and_set(Shared::null(), new_node, SeqCst, g);
        // If this fails, it should be because someone else helped us by CASing it to `new_node`.
        if let Err(CompareAndSetError { current: c, new: _n}) = self.buckets[bucket].compare_and_set(Shared::null(), new_node, SeqCst, g) {
            assert_eq!(c, new_node);
        }
    }

    pub fn insert(&self, k: K, v: V, g: &epoch::Guard) {

M crossbeam-skiporder/src/list.rs => crossbeam-skiporder/src/list.rs +103 -60
@@ 2,7 2,7 @@ use std::fmt::Debug;
use std::mem;
use std::mem::ManuallyDrop;
use std::result::Result as stdResult;
use std::sync::atomic::Ordering::{SeqCst};
use std::sync::atomic::Ordering::SeqCst;

use self::epoch::{Atomic, CompareAndSetError, Owned, Shared};
use crossbeam_epoch as epoch;


@@ 169,7 169,8 @@ impl<'a, T: Debug> Entry<'a, T> {
        }
        let curr: &Node<T> = unsafe { self.current.deref() };
        let next: Shared<Node<T>> = curr.next.load(SeqCst, p);
        if next == self.current { // sanity check which should never fire! (node pointing to itself)
        if next == self.current {
            // sanity check which should never fire!
            panic!("Node points to self!");
        }



@@ 217,24 218,34 @@ impl<'a, T: Debug> Entry<'a, T> {
    pub fn insert_between(&self, new_node: Shared<Node<T>>, g: &epoch::Guard) -> Result<T> {
        let prev = self.previous;
        let curr = self.current;
        unsafe {
        let ret = unsafe {
            // This deref is safe, since we are using SeqCst, and we know that the node is valid.
            new_node.deref().next.store(curr.with_tag(0), SeqCst);
            let ret = prev
                .deref()
            // This is safe, for the same reasons.
            prev.deref()
                .next
                .compare_and_set(curr.with_tag(0), new_node, SeqCst, g);
            if let Err(CompareAndSetError { current: c, new: _n }) = ret {
                if c.tag() != 0 {
                    Err(Deleted)
                } else {
                    Err(LostRace)
                }
                .compare_and_set(curr.with_tag(0), new_node, SeqCst, g)
        };
        if let Err(CompareAndSetError {
            current: c,
            new: _n,
        }) = ret
        {
            if c.tag() != 0 {
                Err(Deleted)
            } else {
                Ok(Done)
                Err(LostRace)
            }
        } else {
            Ok(Done)
        }
    }

    /// Will return `Err(IllegalState)` if `current` or `previous` is `null`, or if `previous` is being deleted.
    /// Will return `Err(NotConsecutive)` if some node has been inserted in between `previous` and `current`.
    /// Will return `Err(Deleted)` if the node was deleted by someone else.
    /// Will return `Err(LostRace)` if we couldn't mark `current` as deleted
    /// Will return `Ok(Done)` if the node was successfully marked (we don't care if the node was actually removed).
    pub fn delete(&self, p: &epoch::Guard) -> Result<T> {
        let curr = self.current;
        if curr.is_null() {


@@ 248,33 259,38 @@ impl<'a, T: Debug> Entry<'a, T> {
            panic!("Entry::delete, but curr == prev! We cannot delete a node without first have stepped at least once!");
        }

        unsafe {
            let next = curr.deref().next.load(SeqCst, p);
            if next.tag() != 0 {
                let ret = prev
                    .deref()
                    .next
                    .compare_and_set(curr, next.with_tag(0), SeqCst, p);
                if let Err(CompareAndSetError { current: c, new: _ }) = ret {
                    if c.tag() != 0 {
                        return Err(IllegalState);
                    }
                    return Err(NotConsecutive);
                }
                return Err(Deleted);
            }
        let curr_node = unsafe { curr.deref() };
        let prev_node = unsafe { prev.deref() };

            let res = curr
                .deref()
        let next = curr_node.next.load(SeqCst, p);
        if next.tag() != 0 { // We are being deleted by someone!
            // Try to swing us out
            let ret = prev_node
                .next
                .compare_and_set(next, next.with_tag(1), SeqCst, p);
            if res.is_err() {
                return Err(LostRace);
                .compare_and_set(curr, next.with_tag(0), SeqCst, p);
            if let Err(CompareAndSetError { current: c, new: _ }) = ret {
                // Failed, and prev is also being deleted
                if c.tag() != 0 {
                    return Err(IllegalState);
                }
                // Someone was inserted in between us.
                return Err(NotConsecutive);
            }
            p.defer_unchecked(move || drop(curr.into_owned()));
            let _ = prev.deref().next.compare_and_set(curr, next, SeqCst, p);
            Ok(Done)
            unsafe {p.defer_unchecked(move || drop(curr.into_owned()));}
            return Err(Deleted);
        }

        let res = curr_node
            .next
            .compare_and_set(next, next.with_tag(1), SeqCst, p);
        if res.is_err() {
            return Err(LostRace);
        }
        let res = prev_node.next.compare_and_set(curr, next, SeqCst, p);
        if res.is_ok() {
            unsafe {p.defer_unchecked(move || drop(curr.into_owned()));}
        }
        Ok(Done)
    }

    /// Finds the first element in the list for which the predicate is `true`. If found we return


@@ 284,17 300,27 @@ impl<'a, T: Debug> Entry<'a, T> {
    where
        F: Fn(&T) -> bool,
    {
        use ::std::collections::HashMap;
        // let mut seen = HashMap::new();
        use std::collections::HashMap;
        let mut seen = HashMap::new();
        let mut iter = 0;
        loop {
            if self.current.is_null() {
                return Err(Empty);
            }
            // let previous = seen.insert(self.current.as_raw() as usize, iter);
            // if let Some(prev_iter) = previous {
            //     panic!("This value has already been seen! (so we're looping) {}/{}", prev_iter, iter);
            // }
            let previous = seen.insert(self.current.as_raw() as usize, iter);
            if let Some(prev_iter) = previous {
                eprintln!("previous node at {:x}: {:#?}",
                          self.previous.as_raw() as usize,
                          unsafe { self.previous.deref() });
                eprintln!("current node at {:x}: {:#?}",
                          self.current.as_raw() as usize,
                          unsafe { self.current.deref() });
                let curr = unsafe { self.current.deref() };
                eprintln!("next node at {:x}: {:#?}",
                          curr.next.load(SeqCst, g).as_raw() as usize,
                          unsafe { curr.next.load(SeqCst, g).deref() });
                panic!("This value has already been seen! (so we're looping) {}/{}", prev_iter, iter);
            }
            let curr = unsafe { self.current.deref() };
            if f(curr.data()) {
                return Ok(Done);


@@ 302,6 328,9 @@ impl<'a, T: Debug> Entry<'a, T> {

            match self.step(g) {
                e @ Err(IllegalState) => return e,
                Err(_) => {
                    seen.remove(&(self.current.as_raw() as usize));
                }
                _ => {}
            }
            iter += 1;


@@ 315,17 344,29 @@ impl<'a, T: Debug> Entry<'a, T> {
    where
        F: FnMut(&T) -> Option<bool>,
    {
        use ::std::collections::HashMap;
        // let mut seen = HashMap::new();
        use std::collections::HashMap;
        let mut seen = HashMap::new();
        let mut iter = 0;
        loop {
            if self.current.is_null() {
                return Err(Empty);
            }
            // let previous = seen.insert(self.current.as_raw() as usize, iter);
            // if let Some(prev_iter) = previous {
            //     panic!("This value has already been seen! (so we're looping) {}/{}", prev_iter, iter);
            // }

            let previous = seen.insert(self.current.as_raw() as usize, iter);
            if let Some(prev_iter) = previous {
                eprintln!("previous node at {:x}: {:#?}",
                          self.previous.as_raw() as usize,
                          unsafe { self.previous.deref() });
                eprintln!("current node at {:x}: {:#?}",
                          self.current.as_raw() as usize,
                          unsafe { self.current.deref() });
                let curr = unsafe { self.current.deref() };
                eprintln!("next node at {:x}: {:#?}",
                          curr.next.load(SeqCst, g).as_raw() as usize,
                          unsafe { curr.next.load(SeqCst, g).deref() });
                panic!("This value has already been seen! (so we're looping) {}/{}", prev_iter, iter);
            }

            let curr = unsafe { self.current.deref() };
            match f(curr.data()) {
                Some(true) => return Ok(Done),


@@ 334,7 375,10 @@ impl<'a, T: Debug> Entry<'a, T> {
            }

            match self.step(g) {
                e @ Err(_) => return e,
                e @ Err(IllegalState) => return e,
                Err(_) => {
                    seen.remove(&(self.current.as_raw() as usize));
                }
                _ => {}
            }
            iter += 1;


@@ 353,12 397,11 @@ impl<'a, T: Debug> Entry<'a, T> {
        unsafe { self.previous.as_ref() }
    }


    /// Return the value of the node that we are currently looking at. Return `None` if we are not
    /// looking at a node.
    #[inline(always)]
    pub fn value(&'a self) -> Option<&'a T> {
        unsafe { self.current.as_ref().map(|p| p.data() ) }
        unsafe { self.current.as_ref().map(|p| p.data()) }
    }
}



@@ 376,7 419,7 @@ where
#[cfg(test)]
mod test {
    use super::*;
    use ::std::sync::atomic::Ordering;
    use std::sync::atomic::Ordering;

    #[test]
    fn list_insert_front() {


@@ 577,10 620,10 @@ mod test {

        let list = Arc::new(List::new());
        {
        let g = &epoch::pin();
        for i in 0..N {
            list.insert_front(i, g);
        }
            let g = &epoch::pin();
            for i in 0..N {
                list.insert_front(i, g);
            }
        }

        let handles = (0..P)


@@ 611,10 654,10 @@ mod test {

        let list = Arc::new(List::new());
        {
        let g = &epoch::pin();
        for i in 0..N {
            list.insert_front(i, g);
        }
            let g = &epoch::pin();
            for i in 0..N {
                list.insert_front(i, g);
            }
        }

        scope(|s| {