~mht/cmr

03d1605b45d8949caaa36ad991c605891d8f5335 — Martin Hafskjold Thoresen 3 years ago 33b1cbc
Port over list tests from data-structures to crossbeam-skiporder

Tests are running fine? We're still looping in the 80/10/10 benchmark
using cbo::hashmap
M crossbeam-skiporder/Cargo.toml => crossbeam-skiporder/Cargo.toml +1 -0
@@ 5,4 5,5 @@ authors = ["Martin Hafskjold Thoresen <martinhath@gmail.com>"]

[dependencies]
crossbeam-epoch = { path = "../extern-benchmarks/crossbeam-epoch" }
crossbeam-utils = "0.5"
rand = "*"
\ No newline at end of file

M crossbeam-skiporder/src/hashmap.rs => crossbeam-skiporder/src/hashmap.rs +10 -10
@@ 46,7 46,7 @@ where
        };
        let g = &epoch::pin();
        unsafe {
            s._list.insert_front(Entry::Sentinel(0));
            s._list.insert_front(Entry::Sentinel(0), g);
            let sentinel = s._list.head.load(SeqCst, g);
            let a = Atomic::null();
            a.store(sentinel, SeqCst);


@@ 102,7 102,7 @@ where

        'restart: loop {
            let mut entry = list::Entry::from_node_ptr(parent);
            match entry.seek_with(g, |other| sentinel_hash <= other.hash()) {
            match entry.seek_with(|other| sentinel_hash <= other.hash(), g) {
                Ok(_) => unsafe {
                    let curr = entry.current.deref();
                    if curr.data().is_sentinel() && curr.data().hash() == sentinel_hash {


@@ 121,7 121,7 @@ where
                Err(list::Error::Empty) => unsafe {
                    let prev = entry.previous.deref();
                    new_node.deref().next.store(Shared::null(), SeqCst);
                    if let Err(CompareAndSetError { current: _, new: n }) = prev
                    if let Err(CompareAndSetError { current: _, new: _n }) = prev
                        .next
                        .compare_and_set(Shared::null(), new_node, SeqCst, g)
                    {


@@ 130,7 130,7 @@ where
                    break 'restart;
                },
                Err(list::Error::IllegalState) => continue 'restart,
                other => panic!("hashmap::insert_sentinel unreachable"),
                _other => panic!("hashmap::insert_sentinel unreachable"),
            }
        }
        let _ = self.buckets[bucket].compare_and_set(Shared::null(), new_node, SeqCst, g);


@@ 146,10 146,10 @@ where

        'restart: loop {
            let mut entry = list::Entry::from_node_ptr(curr);
            let res = entry.seek_with(g, |e| match e {
            let res = entry.seek_with(|e| match e {
                &Entry::Value((h, ref _key, _)) => h >= rev_hash,
                &Entry::Sentinel(h) => h > rev_hash,
            });
            }, g);

            if let Err(list::Error::Empty) = res {
                // We know that `_prev` is non-null, since the list is not empty.


@@ 209,7 209,7 @@ where
        let mut entry = list::Entry::from_node_ptr(curr);

        // Look for a node that has either a too large hash, or is the node we're looking for.
        let ret = entry.seek_with_opt(g, |data| {
        let ret = entry.seek_with_opt(|data| {
            Some(match data {
                &Entry::Value((h, ref key, _)) => {
                    if h > rev_hash {


@@ 220,7 220,7 @@ where
                }
                _ => false,
            })
        });
        }, g);
        ret.is_ok()
    }



@@ 235,7 235,7 @@ where
        loop {
            let mut entry = list::Entry::from_node_ptr(curr);

            let ret = entry.seek_with_opt(g, |data| {
            let ret = entry.seek_with_opt(|data| {
                if rev_hash < data.hash() {
                    None
                } else {


@@ 244,7 244,7 @@ where
                        _ => Some(false),
                    }
                }
            });
            }, g);
            match ret {
                Err(list::Error::Empty) => return false,
                Err(_) => continue,

M crossbeam-skiporder/src/lib.rs => crossbeam-skiporder/src/lib.rs +1 -0
@@ 1,6 1,7 @@
#![allow(dead_code)]

extern crate crossbeam_epoch;
extern crate crossbeam_utils;

pub mod list;
pub mod hashmap;

M crossbeam-skiporder/src/list.rs => crossbeam-skiporder/src/list.rs +348 -13
@@ 1,8 1,8 @@
use std::fmt::Debug;
use std::mem;
use std::mem::ManuallyDrop;
use std::result::Result as stdResult;
use std::sync;
use std::sync::atomic::Ordering::{self, SeqCst};
use std::sync::atomic::Ordering::{SeqCst};

use self::epoch::{Atomic, CompareAndSetError, Owned, Shared};
use crossbeam_epoch as epoch;


@@ 84,7 84,7 @@ impl<T> Node<T> {
    }
}

impl<T> List<T> {
impl<T: Debug> List<T> {
    pub fn new() -> Self {
        List {
            head: Atomic::null(),


@@ 92,21 92,57 @@ impl<T> List<T> {
    }

    /// Insert the element at the beginning of the list.
    pub fn insert_front(&self, t: T) {
        let p = &epoch::pin();
        let mut new_node = Owned::new(Node::new(t)).into_shared(p);
    pub fn insert_front(&self, t: T, g: &epoch::Guard) {
        let new_node = Owned::new(Node::new(t)).into_shared(g);
        loop {
            let head = self.head.load(SeqCst, p);
            let head = self.head.load(SeqCst, g);
            // This is safe since we haven't published the new node yet.
            unsafe {
                new_node.deref().next.store(head, SeqCst);
            }
            match self.head.compare_and_set(head, new_node, SeqCst, p) {
            match self.head.compare_and_set(head, new_node, SeqCst, g) {
                Ok(_) => return,
                Err(_) => {}
            }
        }
    }

    /// Remove the first element in the list, if any.
    pub fn remove_front(&self, g: &epoch::Guard) -> Result<T> {
        loop {
            let head_ptr = self.head.load(SeqCst, g);
            let head = unsafe { head_ptr.as_ref().ok_or(Empty)? };
            let next = head.next.load(SeqCst, g);
            if self.head.compare_and_set(head_ptr, next, SeqCst, g).is_ok() {
                let data = unsafe { ::std::ptr::read(&head.data) };
                return Ok(Succ::Value(ManuallyDrop::into_inner(data)));
            }
        }
    }

    pub fn entry<'a>(&self, g: &'a epoch::Guard) -> Entry<'a, T> {
        Entry::from_node_ptr(self.head.load(SeqCst, g))
    }

    pub fn count(&self, g: &epoch::Guard) -> usize {
        'outer: loop {
            let mut c = 1;
            let mut e = self.entry(g);
            if e.current().is_none() {
                return 0;
            }
            loop {
                match e.step(g) {
                    Ok(_) => c += 1,
                    Err(Error::IllegalState) | Err(Error::NotConsecutive) | Err(Error::Deleted) => {
                        continue 'outer;
                    }
                    Err(Error::Empty) => return c,
                    Err(Error::LostRace) => {}
                }
            }
        }
    }
}

#[derive(Debug)]


@@ 116,7 152,6 @@ pub struct Entry<'a, T: 'a> {
    pub next: Shared<'a, Node<T>>,
}

use std::fmt::Debug;
impl<'a, T: Debug> Entry<'a, T> {
    pub fn from_node_ptr(ptr: Shared<'a, Node<T>>) -> Self {
        Entry {


@@ 167,7 202,7 @@ impl<'a, T: Debug> Entry<'a, T> {
    ///
    /// TODO: should remove these assumptions. Can return Err(Empty) or something, if null.
    #[inline(always)]
    pub fn insert_between(&self, mut new_node: Shared<Node<T>>, g: &epoch::Guard) -> Result<T> {
    pub fn insert_between(&self, new_node: Shared<Node<T>>, g: &epoch::Guard) -> Result<T> {
        let prev = self.previous;
        let curr = self.current;
        unsafe {


@@ 176,7 211,7 @@ impl<'a, T: Debug> Entry<'a, T> {
                .deref()
                .next
                .compare_and_set(curr.with_tag(0), new_node, SeqCst, g);
            if let Err(CompareAndSetError { current: c, new: n }) = ret {
            if let Err(CompareAndSetError { current: c, new: _n }) = ret {
                if c.tag() != 0 {
                    Err(Deleted)
                } else {


@@ 233,7 268,7 @@ impl<'a, T: Debug> Entry<'a, T> {
    /// Finds the first element in the list for which the predicate is `true`. If found we return
    /// `Ok(Done)`, and `f(entry.value()) == true`.
    #[inline(always)]
    pub fn seek_with<F>(&mut self, g: &'a epoch::Guard, f: F) -> Result<T>
    pub fn seek_with<F>(&mut self, f: F, g: &'a epoch::Guard) -> Result<T>
    where
        F: Fn(&T) -> bool,
    {


@@ 256,7 291,7 @@ impl<'a, T: Debug> Entry<'a, T> {
    /// Try to find a node that satisfy the given predicate. If the given function returns `None`,
    /// abort.
    #[inline(always)]
    pub fn seek_with_opt<F>(&mut self, g: &'a epoch::Guard, mut f: F) -> Result<T>
    pub fn seek_with_opt<F>(&mut self, mut f: F, g: &'a epoch::Guard) -> Result<T>
    where
        F: FnMut(&T) -> Option<bool>,
    {


@@ 277,4 312,304 @@ impl<'a, T: Debug> Entry<'a, T> {
            }
        }
    }

    /// Return a ptr to the current node, if any.
    #[inline(always)]
    pub fn current(&'a self) -> Option<&'a Node<T>> {
        unsafe { self.current.as_ref() }
    }

    /// Return a ptr to the previous node, if any.
    #[inline(always)]
    pub fn previous(&'a self) -> Option<&'a Node<T>> {
        unsafe { self.previous.as_ref() }
    }


    /// Return the value of the node that we are currently looking at. Return `None` if we are not
    /// looking at a node.
    #[inline(always)]
    pub fn value(&'a self) -> Option<&'a T> {
        unsafe { self.current.as_ref().map(|p| p.data() ) }
    }
}

impl<'a, T> Entry<'a, T>
where
    T: PartialEq + Debug,
{
    /// Search for the given element in the list. If found, we return `Ok(Done)`, and
    /// `entry.value() == Some(&t)`.
    fn seek(&mut self, t: &T, g: &'a epoch::Guard) -> Result<T> {
        self.seek_with(|e| e == t, g)
    }
}

#[cfg(test)]
mod test {
    use super::*;
    use ::std::sync::atomic::Ordering;

    #[test]
    fn list_insert_front() {
        let mut l: List<i32> = List::new();
        let g = &epoch::pin();

        for i in 0..10 {
            l.insert_front(i, g);
        }
        for i in 0..10 {
            let e = l.remove_front(g);
            assert_eq!(e, Ok(Succ::Value(9 - i)));
        }
    }

    #[test]
    fn list_entry_value() {
        let mut l: List<i32> = List::new();
        let g = &epoch::pin();

        for i in 0..3 {
            l.insert_front(i, g);
        }

        let mut entry = l.entry(g);

        assert_eq!(entry.value(), Some(&2));
        assert_eq!(entry.value(), Some(&2));
        assert_eq!(entry.step(g), Ok(Done));
        assert_eq!(entry.value(), Some(&1));
        assert_eq!(entry.step(g), Ok(Done));
        assert_eq!(entry.value(), Some(&0));
        assert_eq!(entry.step(g), Err(Empty));
        assert_eq!(entry.value(), None);
    }

    #[test]
    fn list_entry_remove() {
        let mut l: List<i32> = List::new();
        let g = &epoch::pin();

        for i in 0..5 {
            l.insert_front(i, g);
        }

        let mut entry = l.entry(g);

        entry.seek(&2, g).unwrap();
        let n = entry.delete(g).unwrap();
        assert_eq!(n, Done);

        assert_eq!(l.remove_front(g), Ok(Succ::Value(4)));
        assert_eq!(l.remove_front(g), Ok(Succ::Value(3)));
        assert_eq!(l.remove_front(g), Ok(Succ::Value(1)));
        assert_eq!(l.remove_front(g), Ok(Succ::Value(0)));
        assert_eq!(l.remove_front(g), Err(Empty));
    }

    #[test]
    fn list_entry_delete() {
        let mut l: List<i32> = List::new();
        let g = &epoch::pin();

        for i in 0..5 {
            l.insert_front(i, g);
        }

        let mut entry = l.entry(g);

        entry.seek(&2, g).unwrap();
        entry.delete(g).unwrap();

        assert_eq!(l.remove_front(g), Ok(Succ::Value(4)));
        assert_eq!(l.remove_front(g), Ok(Succ::Value(3)));
        assert_eq!(l.remove_front(g), Ok(Succ::Value(1)));
        assert_eq!(l.remove_front(g), Ok(Succ::Value(0)));
        assert_eq!(l.remove_front(g), Err(Empty));
    }

    #[test]
    fn insert_after() {
        let mut l: List<i32> = List::new();
        let g = &epoch::pin();

        for i in 0..5 {
            l.insert_front(i, g);
        }

        let mut entry = l.entry(g);
        let _ = entry.seek(&2, g);

        {
            let node: &Node<_> = &*entry.current().unwrap();
            node.insert_after(9);
        }

        assert_eq!(l.remove_front(g), Ok(Succ::Value(4)));
        assert_eq!(l.remove_front(g), Ok(Succ::Value(3)));
        assert_eq!(l.remove_front(g), Ok(Succ::Value(2)));
        assert_eq!(l.remove_front(g), Ok(Succ::Value(9)));
        assert_eq!(l.remove_front(g), Ok(Succ::Value(1)));
        assert_eq!(l.remove_front(g), Ok(Succ::Value(0)));
        assert_eq!(l.remove_front(g), Err(Empty));
    }

    #[test]
    fn insert_after_last() {
        let mut l: List<i32> = List::new();
        let g = &epoch::pin();

        l.insert_front(3, g);

        let mut entry = l.entry(g);
        entry.current().unwrap().insert_after(5);

        assert_eq!(l.remove_front(g), Ok(Succ::Value(3)));
        assert_eq!(l.remove_front(g), Ok(Succ::Value(5)));
        assert_eq!(l.remove_front(g), Err(Empty));
    }

    #[test]
    fn entry_test() {
        let mut l: List<i32> = List::new();
        let g = &epoch::pin();

        for i in (0..3).rev() {
            l.insert_front(i, g);
        }

        let mut entry = l.entry(g);

        ::std::thread::sleep_ms(100);

        assert_eq!(entry.value(), Some(&0));
        entry.step(g).unwrap();
        assert_eq!(entry.value(), Some(&1));
        entry.step(g).unwrap();
        assert_eq!(entry.value(), Some(&2));
        let _ = entry.step(g);
        assert_eq!(entry.value(), None);
        let _ = entry.step(g);
        assert_eq!(entry.value(), None);
    }

    #[test]
    fn entry_keeps_stuff_around() {
        let mut l: List<i32> = List::new();
        let g = &epoch::pin();

        for i in (0..5).rev() {
            l.insert_front(i, g);
        }

        let mut entry = l.entry(g);

        // Unlink the first node from the Lists SharedGuard. Since the node is guarded by us, this
        // should be fine.
        l.head.store(epoch::Shared::null(), Ordering::SeqCst);

        for i in 0..5 {
            assert_eq!(entry.value(), Some(&i));
            let _ = entry.step(g);
        }
        assert_eq!(entry.value(), None);
        let _ = entry.step(g);
        assert_eq!(entry.value(), None);
    }

    use crossbeam_utils::thread::scope;
    use std::thread::spawn;

    #[test]
    fn mp_insert_only() {
        let list = &List::new();
        scope(|s| {
            for i in 0..4 {
                s.spawn(move || {
                    let g = &epoch::pin();
                    for j in 0..10 {
                        let n: usize = j * 4 + i;
                        list.insert_front(n, g);
                    }
                });
            }
        });

        let g = &epoch::pin();
        let c = list.count(g);
        assert_eq!(c, 40);
    }

    use std::sync::Arc;

    #[test]
    fn mp_insert_remove() {
        const N: usize = 400;
        const P: usize = 4;

        let list = Arc::new(List::new());
        {
        let g = &epoch::pin();
        for i in 0..N {
            list.insert_front(i, g);
        }
        }

        let handles = (0..P)
            .map(|i| {
                let list = list.clone();
                spawn(move || {
                    let g = &epoch::pin();
                    let insert = i & 1 == 0;
                    for j in 0..(N / P) {
                        if insert {
                            list.insert_front(j, g);
                        } else {
                            let _ = list.remove_front(g);
                        }
                    }
                })
            }).collect::<Vec<_>>();

        for h in handles.into_iter() {
            h.join().unwrap();
        }
    }

    #[test]
    fn mp_entry_seek_remove() {
        const N: usize = 400;
        const P: usize = 4;

        let list = Arc::new(List::new());
        {
        let g = &epoch::pin();
        for i in 0..N {
            list.insert_front(i, g);
        }
        }

        scope(|s| {
            for i in 0..4 {
                let list = list.clone();
                s.spawn(move || {
                    let g = &epoch::pin();
                    for j in 0..100 {
                        let n = j * 4 + i;
                        let mut entry = list.entry(g);
                        loop {
                            match entry.seek(&n, g) {
                                Ok(_) => break,
                                Err(Empty) => assert!(false),
                                Err(_) => continue,
                            }
                            if entry.delete(g).is_ok() {
                                break;
                            }
                        }
                    }
                });
            }
        });
    }
}