~mht/cmr

ref: 5b585576fd05856862c7b665eecadc83bccfc1dd cmr/src/lib.rs -rw-r--r-- 34.9 KiB
5b585576 — Martin Hafskjold Thoresen Add fences around the allocator wrappers 4 years ago
                                                                                
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
//! # CMR
//!
//! This is a mark-and-sweep garbage collector optimized for concurrent applications.

#![feature(alloc_system, alloc_jemalloc, allocator_api)]
#![feature(asm)]
#![feature(const_fn)]
#![feature(const_vec_new)]
#![feature(nll)]
#![feature(optin_builtin_traits)]
#![feature(raw)]
#![feature(specialization)]
#![feature(test)]
#![feature(try_from)]

extern crate alloc_system;
extern crate jemallocator;

#[cfg(not(feature = "system-allocator"))]
#[global_allocator]
static ALLOC: alloc::WrappedAllocator = alloc::WrappedAllocator;

#[cfg(feature = "system-allocator")]
#[global_allocator]
static ALLOC: alloc::WrappedSystemAllocator = alloc::WrappedSystemAllocator;

#[macro_use]
extern crate lazy_static;
extern crate byteorder;
extern crate libc;
extern crate memmap;
extern crate parking_lot;
extern crate tid;

extern crate jemalloc_free_hack;

use std::cell::{RefCell, UnsafeCell};
use std::collections::{HashMap, HashSet, VecDeque};
use std::io::Cursor;
use std::iter::once;
use std::raw::TraitObject;
use std::sync::atomic::Ordering::*;
use std::sync::atomic::{compiler_fence, fence, AtomicBool, AtomicPtr, AtomicUsize};
use std::sync::mpsc::{channel, Receiver, Sender};
use std::sync::Mutex as StdMutex;
use std::sync::RwLock;

use byteorder::NativeEndian;
use byteorder::{ReadBytesExt, WriteBytesExt};
use parking_lot::Once;
use tid::Timer;

use signalvec::SignalVec;

const MARKER: u64 = !0;

/// Macro to make a stack allocated `Guard`. This should be used rather than some constructor for
/// `Guard`, as the macro makes it impossible to `move` the guard, which is illegal (but cannot,
/// yet, be enforced by the compiler).
/// Macros are dumb, so we have to have this above all `mod`s, or else it wont be visible to them.
#[macro_export]
macro_rules! guard {
    ($var:ident) => {
        let $var = unsafe { &mut $crate::guard::Guard::new() };
        $var.register();
    };
}

/// Makes multile `Guard`s at the same time.
#[macro_export]
macro_rules! guards {
    ($($var:ident),+) => {
        $(let $var = unsafe { &mut $crate::guard::Guard::new() };)+
        $crate::guard::Guard::register_bulk(&[
            $($var,)+
        ]);
    }
}

pub mod alloc;
pub mod ptr;
pub mod guard;
pub mod test;

mod signalvec;

pub use alloc::{without_reclamation, Arc, Box, Rc};
pub use ptr::{Atomic, NullablePtr, Ptr, Trace};
pub use guard::{Guard, SharedGuard};
// TODO: rename the `atomic` module to `ptr`

// TODO: put this somewhere else?
fn black_box<T>(dummy: T) -> T {
    // we need to "use" the argument in some way LLVM can't introspect.
    unsafe { asm!("" : : "r"(&dummy)) }
    dummy
}

// Return some thread unique identifier
fn thread_id() -> usize {
    unsafe { libc::pthread_self() as usize }
}

/// Guard the address in `a` in the guard `guard`. Return `Some(Ptr)` if the address was non-zero.
/// Return `None` if it was zero.
/// TODO: Ordering
pub fn guard<'a, T>(guard: &'a mut Guard<T>, a: &Atomic<T>) -> NullablePtr<'a, T> {
    #[cfg(feature = "noop")]
    {
        let p = unsafe { a.load(SeqCst) };
        guard.inner = ptr::raw(p);
        return p;
    }
    without_reclamation_repeat(|| {
        let p = unsafe { a.load(SeqCst) };
        guard.inner = ptr::raw(p);
        p
    })
}

/// Allocate `t` on the heap, and put the resulting address into `guard`. Return a `Ptr` to the
/// allocated data.
pub fn alloc<T: Trace>(guard: &mut Guard<T>, t: T) -> Ptr<T> {
    #[cfg(feature = "noop")]
    {
        let ptr = alloc::alloc(t);
        guard.inner = ptr::addr(ptr);
        return ptr;
    }
    let ptr = alloc::alloc(t);
    guard.inner = ptr::addr(ptr);
    alloc::register(ptr);
    ptr
}

/// The nubmer of bytes we `mmap` for IPC. This should be a multiple of 8, as all data we pass
/// there is pointers or number of pointers.
/// TODO: Set this to something reasonable.
const MMAP_SIZE: usize = 1024 * 1024 * 1024;

thread_local! {
    /// A flag set to `true` in the signal handler, which is read in `atoimc()`. This ensures that
    /// no pointer can be freed before having registered it. If we get signaled in the middle to
    /// `atomic`, we will simply restart.
    pub static WAS_SIGNALED: UnsafeCell<bool> = UnsafeCell::new(false);
    /// This is a hack: `atomic` doesn't support being called recursively. This is a runtime check
    /// to identify such cases.
    pub static IS_IN_ATOMIC: UnsafeCell<bool> = UnsafeCell::new(false);
    /// Initialize the system when a thread first creates a guard.
    pub static INITIALIZER: parking_lot::Once = parking_lot::ONCE_INIT;
}

/// A mutex, to ensure that only one thread is consolidating at a time.
static CONSOLIDATE_LOCK: AtomicBool = AtomicBool::new(false);
/// The consolidate iteration. This is used for callers `do_consolidate` to know when their
/// iteration is finished by the background thread.
static CONSOLIDATE_ITERATION: AtomicUsize = AtomicUsize::new(0);
/// The global `SystemHandle`
static SYSTEM_HANDLE: parking_lot::Mutex<Option<SystemHandle>> = parking_lot::Mutex::new(None);

lazy_static! {
    /// The list of pthread handlers that are registered in the system. A thread inserts itself
    /// here when it is spawned.
    static ref THREAD_HANDLERS: StdMutex<HashSet<usize>> = StdMutex::new(HashSet::new());
    /// Thread counters which we use as poor mans condvar. We need three of them: (1) signal
    /// handler arrival and thread id retrieval, (2) register that all threads have written their
    /// `Vec<Guard>` address to global memory, and (3) for waiting for the `fork` to complete.
    static ref THREAD_COUNTERS: [AtomicUsize; 3] = Default::default();
    /// An array in which the threads store a pointer to their thread local `Vec<Guard>`.
    /// Alternatively we could just store the root pointers in here, but that requires a little
    /// more careful pointer management.
    static ref THREAD_DATAS: RwLock<Vec<Option<ThreadData>>> =
        RwLock::new(Vec::with_capacity(32));
    /// A global map of allocated memory that is not yet freed. This may or may not be reachable.
    /// The map is a map from address to vtable pointer address for `Trace`.
    static ref ALLOCATED_ADDRS: StdMutex<HashMap<usize, usize>> = StdMutex::new(HashMap::new());
    /// Flag for the threads to wait in the signal handler. We need this so that we know how many
    /// thread were signaled before they do anything in the handler.
    static ref SIGNAL_HANDLER_WAIT: AtomicBool = AtomicBool::new(true);
}

/// This is all the data that the consolidator must send to the background thread, in order for it
/// for know what we've been doing.
enum BackgroundThreadInfo {
    ReclamationPass {
        /// The number of threads that are still in the signal handler
        thread_count: usize,
        /// The memory mapped area we use for IPC.
        map: memmap::MmapMut,
        /// The new allocations from one reclamation pass.
        new_allocs: Vec<(usize, usize)>,
        /// The PID of the spawned child
        child_pid: i32,
    },
    ActivateThread(*const AtomicPtr<AtomicBool>),
}

unsafe impl Send for BackgroundThreadInfo {}

/// Data for the system.
struct SystemHandle {
    /// This `Sender` is the only way to communicate to the background thread.
    sender: Sender<BackgroundThreadInfo>,
}

/// Data for a thread which it writes out in its signal handler. This is the thread local data that
/// the consolidator needs in order to see what the thread has been up to since the last
/// consolidation.
struct ThreadData {
    /// A pointer to its thread local `Vec<*const Guard>`. That is, this is a pointer to a `Vec` in
    /// the threads TLS, which itself contains addresses on that threads stack, where `Guard`s
    /// resides.
    guards: AtomicPtr<Vec<guard::GuardPointer>>,
    /// A `Vec` of allocations that the thread has gotten since the previous consolidation. We must
    /// use `SignalVec` instead of a regular `Vec` here since the consolidator must be able to
    /// `clear` the vector, without messing up whatever the thread was doing at the time it got
    /// signaled.
    allocs: ::std::sync::Arc<UnsafeCell<signalvec::SignalVec>>,
}

// `SignalVec` isn't `Send` because it contains a `*mut` ptr, but we need it to be, in order for us
// to have a `Vec<ThreadData>` in `lazy_static`. `UnsafeCell` is not `Sync` either, since it could
// allow data races. However, due to how our signal handlers work, we know that only one thread is
// accessing the `ThreadData` at any time. Therefore, it should be OK.
unsafe impl Send for ThreadData {}
unsafe impl Sync for ThreadData {}

thread_local! {
    static THREAD_INIT: RefCell<parking_lot::Once> = RefCell::new(Once::new());
}
/// Do initialization for a new thread. This should be called by all threads that use the system in
/// any way.
///
/// It is safe to call this multiple times.
pub fn thread_activate() {
    let init = || {
        alloc::ALLOC.with(|a| { black_box(a); });
        guard::ROOTS.with(|a| { black_box(a); });
        let addr = alloc::SOME_LOCK_MARKER.with(|b| {
            let ptr = &*b as *const AtomicPtr<AtomicBool>;
            ptr
        });
        SYSTEM_HANDLE
            .lock()
            .as_ref()
            .expect("SYSTEM_HANDLE should have been Some (thread_activate)")
            .sender
            .send(BackgroundThreadInfo::ActivateThread(addr))
            .expect("thread_activate failed to send background thread message");
        alloc::SOME_LOCK_MARKER.with(|b| while b.load(SeqCst).is_null() {});

        compiler_fence(SeqCst);
        // Registering the signal without being in the thread list is OK, since no thread can
        // signal us without having our handle.
        unsafe {
            let mut sigset = std::mem::uninitialized();
            libc::sigemptyset(&mut sigset);
            let sigaction = libc::sigaction {
                sa_sigaction: signal_handler as *const fn() as usize,
                sa_mask: sigset,
                sa_flags: libc::SA_NODEFER,
                sa_restorer: None,
            };
            libc::sigaction(libc::SIGUSR1, &sigaction, std::ptr::null_mut());
        }
        let me = thread_id();
        // This lock will block as most for the duration it takes for the consolidator to iterate
        // through the list and signal everyone. This will not deadlock, since we are not in the
        // thread list yet, so we will not be signaled by that thread. After taking the lock, the
        // consolidator risks waiting for us, but that it also fine, as we won't be signaled during
        // that time (since we have the lock).
        THREAD_HANDLERS.lock().unwrap().insert(me);
    };
    THREAD_INIT.with(|i| i.borrow().call_once(init));
}

/// Do destruction of the thread local system. This is needed in order to have fine-grained control
/// over what happens during thread shutdown.
pub fn thread_deactivate() {
    let destroy = || {
        let me = thread_id();
        let succ = THREAD_HANDLERS.lock().unwrap().remove(&me);
        assert!(succ);
    };
    THREAD_INIT.with(|i| {
        destroy();
        let mut once = i.borrow_mut();
        assert_eq!(once.state(), parking_lot::OnceState::Done);
        *once = Once::new();
    });
}

pub fn is_thread_active() -> bool {
    THREAD_INIT.with(|c| c.borrow().state() == parking_lot::OnceState::Done)
}

/// Initialize the global system. This should be called by some thread, before any thread does
/// anything with the system.
///
/// It is safe to call this multiple times.
pub fn global_init() {
    static INIT: parking_lot::Once = parking_lot::ONCE_INIT;
    INIT.call_once(|| {
        let (send, recv) = channel();
        let mut sys_handle = SYSTEM_HANDLE.try_lock().expect(
            "SYSTEM_HANDLE.lock() should never block since we're initializing it!",
        );
        let _handle = std::thread::spawn(|| background_thread(recv));
        *sys_handle = Some(SystemHandle { sender: send });
    });
}

/// Try to run the given closure without being interupted by a signal. The signal handler must set
/// the flag `WAS_SIGNALED` to `true` for this to work.
// TODO: Rename this so something less ambiguous
pub fn without_reclamation_repeat<F, R>(mut f: F) -> R
where
    F: FnMut() -> R,
{
    IS_IN_ATOMIC.with(|a| unsafe {
        if *a.get() {
            panic!("nested atomic() calls are not sound!");
        }
        *a.get() = true;
    });
    loop {
        WAS_SIGNALED.with(|ws| unsafe { *ws.get() = false });
        compiler_fence(SeqCst);
        let ret = f();
        compiler_fence(SeqCst);
        let got_signal = WAS_SIGNALED.with(|ws| unsafe { *ws.get() });
        if !got_signal {
            IS_IN_ATOMIC.with(|a| unsafe { *a.get() = false });
            return ret;
        }
        // println!("atomic retry");
    }
}

/// This is the core procedure of the system. When a thread thinks its time to reclaim memory it
/// calls this procedure.
fn consolidate() -> Option<usize> {
    #[cfg(feature = "noop")]
    {
        let this_iter = CONSOLIDATE_ITERATION.load(SeqCst) + 1;
        return Some(this_iter);
    }
    if CONSOLIDATE_LOCK.compare_and_swap(false, true, SeqCst) {
        return None;
    }
    compiler_fence(SeqCst);

    let mut t = Timer::new();

    // Make sure that no thread have taken the internal malloc lock, so that we control who can
    // alloc. This also ensures that any thread in `without_reclamation` will complete its
    // operation without us signaling them.
    // let alloc_lock = alloc::ALLOC_LOCK.lock();
    let alloc_lock = alloc::SOME_LOCK.lock();
    assert!(
        alloc_lock.is_some(),
        "Somehow, we are the reclaimer in the middle of allocating??"
    );

    let _stdout = ::std::io::stdout();
    let mut _stdout_lock = _stdout.lock();
    let _stderr = ::std::io::stderr();
    let mut _stderr_lock = _stderr.lock();
    use std::io::Write;
    _stdout_lock.flush().unwrap();
    _stderr_lock.flush().unwrap();

    let mut map = memmap::MmapMut::map_anon(MMAP_SIZE).unwrap();
    SIGNAL_HANDLER_WAIT.store(true, SeqCst);
    THREAD_COUNTERS[1].store(0, SeqCst);
    THREAD_COUNTERS[2].store(0, SeqCst);

    let count = signal_threads_except_self();

    if let Ok(mut v) = THREAD_DATAS.write() {
        v.reserve(count);
        v.clear();
        for _ in 0..count {
            v.push(None);
        }
    } else {
        panic!("Consolidator failed to get write lock into THREAD_DATAS");
    }
    SIGNAL_HANDLER_WAIT.store(false, SeqCst);
    // Wait for all threads to enter the signal handler and read their ID.
    t.mark("signaled threads");

    while THREAD_COUNTERS[0].load(SeqCst) != count {}

    // Wait for all threads to write out their `ThreadData`.
    while THREAD_COUNTERS[1].load(SeqCst) != count {}
    t.mark("wait for threads");

    // This is after all threads are finished in the handler, so all threads have released their
    // write locks.
    let mut thread_datas = THREAD_DATAS.try_write().expect(
        "This should never block! (consolidate)",
    );

    let mut roots = thread_datas
        .iter_mut()
        .take(count)
        .map(|td| td.as_mut().map(|td| *td.guards.get_mut()))
        .chain(once(get_thread_data_ptrs().0))
        .flat_map(|ptr| {
            if ptr.is_none() {
                // If this is still null, it means that some thread was signaled while shutting
                // down. Decrement once so we don't get stuck at the end of this function.
                THREAD_COUNTERS[0].fetch_sub(1, SeqCst);
                return None;
            }
            ptr
        })
        .flat_map(|v_ptr| unsafe { &*v_ptr })
        .flat_map(guard::GuardPointer::deref_to_trait_object)
        .collect::<Vec<_>>();

    // Special case the global roots, since they're a different type.
    if let Ok(v) = guard::GLOBAL_ROOTS.try_lock() {
        roots.extend(v.iter().flat_map(
            guard::GuardPointer::deref_to_trait_object,
        ));
    } else {
        panic!(
            "Oh no, we couldn't get the global roots! This is not good, since reacehable memory \
             will not be detected as such!"
        )
    };

    t.mark("Get all roots");

    // Steal the new allocs from all threads, so that the background thread can free those
    // allocation that are no longer reachable.
    let mut new_allocs = Vec::new();
    for sv_ptr in thread_datas
        .iter_mut()
        .take(count)
        .map(|td| {
            td.as_ref().map(|td| unsafe {
                &mut *td.allocs.get() as *mut SignalVec
            })
        })
        .chain(once(get_thread_data_ptrs().1))
        .flat_map(|p| p)
    {
        let sv: &SignalVec = unsafe { &*sv_ptr };
        let len = sv.len();
        let slice = sv.slice(len);
        for &trait_object in slice {
            let addr = trait_object.data as usize;
            let vtable = trait_object.vtable as usize;
            assert!(addr != 0);
            new_allocs.push((addr, vtable));
        }
        sv.clear();
    }

    t.mark("Get all allocs");

    drop(thread_datas);

    let mut c: Cursor<&mut [u8]> = Cursor::new(&mut map);
    c.write_u64::<NativeEndian>(MARKER as u64).unwrap();

    t.mark("Write marker");

    let me = thread_id();
    let child_pid = match unsafe { libc::fork() } {
        0 => {
            let c = thread_id();
            assert_eq!(me, c);
            drop(_stdout_lock);
            drop(_stderr_lock);
            drop(_stdout);
            drop(_stderr);

            let seen = mark_and_sweep(Cursor::new(&mut map), roots);
            // TODO: do we need this flush? What do we know of the ordering in which data in the
            // memmap appears?
            map.flush().unwrap();
            let mut c: Cursor<&mut [u8]> = Cursor::new(&mut map);
            c.write_u64::<NativeEndian>(seen as u64).unwrap();
            drop(map);

            // The lock in `SignalVec::drop` might deadlock with the background thread (which is
            // not running in this process), so we just forget about the whole thing here.
            alloc::ALLOC.with(|arc| unsafe {
                (*arc.get()).is_dropped = true;
            });
            std::process::exit(0);
        }
        -1 => {
            panic!("fork() failed!");
        }
        pid => pid,
    };
    compiler_fence(SeqCst);
    drop(alloc_lock);

    // We're done reading what we need. Signal to the waiting threads that we're done.
    THREAD_COUNTERS[2].store(1, SeqCst);

    drop(_stdout_lock);
    drop(_stderr_lock);

    t.mark("fork");

    let this_iter = CONSOLIDATE_ITERATION.load(SeqCst) + 1;

    let bti = BackgroundThreadInfo::ReclamationPass {
        thread_count: count,
        new_allocs,
        map: map,
        child_pid,
    };
    SYSTEM_HANDLE
        .lock()
        .as_ref()
        .expect("SYSTEM_HANDLE should have been Some")
        .sender
        .send(bti)
        .expect("Failed to send message to the background thread!");
    t.mark("send to background thread");

    // t.present();
    return Some(this_iter);

    /// Signal all registered threads `SIGUSR1` except the current thread. Return the number of
    /// threads we successfully signaled.
    fn signal_threads_except_self() -> usize {
        let mut count = 0;

        unsafe {
            let me = thread_id();

            let mut th = THREAD_HANDLERS.lock().unwrap();

            th.retain(|&th| if th == me {
                true
            } else {
                let val = libc::sigval { sival_ptr: std::ptr::null_mut() };
                let r = libc::pthread_sigqueue(th as u64, libc::SIGUSR1, val);
                if r == 0 {
                    count += 1;
                    true
                } else {
                    false
                }
            });
        }
        count
    }
}

/// Perform the mark-and-sweep algorithm. Return all pointers that are not reachable.
///
/// `roots_pos` is the offset *in bytes* to where the roots written roots ends.
/// `allocs_pos` is the same, but with the `(allocations, sizes)`.
fn mark_and_sweep(mut cursor: Cursor<&mut [u8]>, roots: Vec<TraitObject>) -> usize {
    // TODO: justify these magic numbers!
    // Since we have the reentrant mutex in the allocator, we know that no other thread is
    // allocating. Therefore, since we don't get here if some thread was allocating while signaled,
    // this is alright.
    //
    let mut seen = HashMap::with_capacity(2048);
    let mut queue = VecDeque::with_capacity(100);

    for &trait_object in roots.iter() {
        let mut to = trait_object;
        to.data = ptr::addr(to.data) as *mut ();
        let addr = to.data as usize;
        let vtable = to.vtable as usize;
        if addr != 0 && seen.insert(addr, vtable).is_none() {
            queue.push_back(to);
        }
    }

    let mut num_ptr = 0;
    cursor.set_position(8);

    let mut ptr_buffer: [TraitObject; 32] = unsafe { std::mem::zeroed() };

    while let Some(to) = queue.pop_front() {
        assert_eq!(ptr::tag(to.data), 0);
        let addr = to.data as usize;
        let t: &ptr::Trace = unsafe { ::std::mem::transmute(to) };

        cursor.write_u64::<NativeEndian>(addr as u64).unwrap();
        num_ptr += 1;

        // NOTE: this will panic if `t` wants to write more than 32 pointers.
        let n = t.write(&mut ptr_buffer);
        for i in 0..n {
            // remove the tag
            let mut to = ptr_buffer[i];
            to.data = ptr::addr(to.data) as *mut ();
            let addr = to.data as usize;
            let vtable = to.vtable as usize;
            if seen.insert(addr, vtable).is_none() {
                queue.push_back(to);
            }
        }
    }
    num_ptr
}

/// Return the two data pointers to thread local data.
fn get_thread_data_ptrs() -> (Option<*mut Vec<guard::GuardPointer>>, Option<*mut SignalVec>) {
    let roots_ptr = match guard::ROOTS.try_with(std::cell::RefCell::as_ptr) {
        Ok(p) => p,
        Err(_) => return (None, None),
    };
    let alloc_ptr = match alloc::ALLOC.try_with(|a| (*a).get()) {
        Ok(p) => p,
        Err(_) => return (None, None),
    };
    (Some(roots_ptr), Some(alloc_ptr))
}

/// This is the signal handler that is ran when the thread gets signaled `SIGUSR1`
fn signal_handler() {
    // locally register that we've been signaled. Used in `atomic(|| )`.
    if WAS_SIGNALED
        .try_with(|ws| unsafe { *ws.get() = true })
        .is_err()
    {
        // We're in the middle of being destroyed. We increment the counters here, but do not write
        // the entry pointer. The consolidator will notice this, and decrement the first counter.
        THREAD_COUNTERS[0].fetch_add(1, SeqCst);
        THREAD_COUNTERS[1].fetch_add(1, SeqCst);
        return;
    };
    while SIGNAL_HANDLER_WAIT.load(SeqCst) {}
    let id = THREAD_COUNTERS[0].fetch_add(1, SeqCst);

    {
        // This read lock will not block: `THREAD_DATAS` is locked twice in `consolidate`: the
        // first time is before we are signaled, and the last time is while we are waiting for
        // `THREAD_COUNTERS[2]` to become `1`.  Assert this by using `try_read` instead to `read`.
        let ptr: *mut Option<ThreadData> = &THREAD_DATAS.try_read().expect(
            "This should never block! (signal_handler)",
        )
            [id] as *const Option<ThreadData> as _;
        let roots = match guard::ROOTS.try_with(std::cell::RefCell::as_ptr) {
            Ok(r) => r,
            Err(_) => {
                // Don't decrement `THREAD_COUNTERS[0]`: this is done by the consolidator as we
                // don't write anything to `ptr`.
                THREAD_COUNTERS[1].fetch_add(1, SeqCst);
                return;
            }
        };
        let allocs = match alloc::ALLOC.try_with(|a| a.clone()) {
            Ok(a) => a,
            Err(_) => {
                // See above comment.
                THREAD_COUNTERS[1].fetch_add(1, SeqCst);
                return;
            }
        };
        // We know that we're the only thread looking at this entry: other threads in signal
        // handlers get a different id from `fetch_add`, and the consolidator doesn't touch this
        // memory before after `THREAD_COUNTERS[1]` has been set to the number of threads. Thus, it
        // is safe to mutate it.
        unsafe {
            let td = Some(ThreadData {
                guards: AtomicPtr::new(roots),
                allocs: allocs,
            });
            ::std::ptr::write(ptr, td);
        }
        fence(SeqCst);
    }
    THREAD_COUNTERS[1].fetch_add(1, SeqCst);
    while THREAD_COUNTERS[2].load(SeqCst) != 1 {
        ::std::thread::yield_now();
    }
    // Register that we're leaving the signal handler. This is needed in order to not have another
    // signal happening while we're still not past the `while`.
    THREAD_COUNTERS[0].fetch_sub(1, SeqCst);
}

/// Force consolidation to happen, and wait for completion. This is good for testing.
pub fn do_consolidate() {
    #[cfg(feature = "noop")] return;
    // If some thread already is consolidating, wait for it to finish.
    while CONSOLIDATE_LOCK.load(SeqCst) {
        ::std::thread::yield_now();
    }
    let our_iter;
    loop {
        if let Some(n) = consolidate() {
            our_iter = n;
            break;
        }
    }
    // Wait for the background thread to finish. We might not see the actual iteration number be
    // exactly what we want, so we can't use `==`.
    while CONSOLIDATE_ITERATION.load(SeqCst) < our_iter {
        ::std::thread::yield_now();
    }
}


#[repr(C)]
#[derive(Default)]
struct PaddedAtomicBool {
    _padding_front: [u8; 31],
    inner: AtomicBool,
    _padding_back: [u8; 32],
}

impl PaddedAtomicBool {
    fn new(b: bool) -> Self {
        PaddedAtomicBool {
            _padding_front: [0; 31],
            inner: AtomicBool::new(b),
            _padding_back: [0; 32],
        }
    }
}

impl ::std::ops::Deref for PaddedAtomicBool {
    type Target = AtomicBool;
    fn deref(&self) -> &Self::Target {
        &self.inner
    }
}

pub(crate) static THREAD_BOOLS: parking_lot::Mutex<Vec<PaddedAtomicBool>> =
    parking_lot::Mutex::new(Vec::new());

fn background_thread(recv: Receiver<BackgroundThreadInfo>) {
    // TODO: MAX NUMBER HERE!!
    const MAX_THREADS: usize = 1024 * 32;
    *THREAD_BOOLS.lock() = Vec::with_capacity(MAX_THREADS);
    loop {
        let (thread_count, mut map, new_allocs, child_pid) = match recv.recv() {
            Ok(BackgroundThreadInfo::ActivateThread(ptr)) => {
                let mut thread_bools = THREAD_BOOLS.lock();
                assert!(thread_bools.len() < MAX_THREADS);
                thread_bools.push(PaddedAtomicBool::new(false));
                let addr = &thread_bools[thread_bools.len() - 1] as *const _ as *mut AtomicBool;
                let atomic: &AtomicPtr<AtomicBool> = unsafe { &*ptr };
                atomic.store(addr, SeqCst);
                continue;
            }
            Ok(BackgroundThreadInfo::ReclamationPass {
                   thread_count,
                   map,
                   new_allocs,
                   child_pid,
               }) => (thread_count, map, new_allocs, child_pid),
            Err(_) => break,
        };

        // timed!("free thread",
        let marker = {
            let r: &[u8] = &map;
            let f: &u8 = &r[0];
            f as *const u8 as *const u64
        };
        let mut cursor: Cursor<&mut [u8]> = Cursor::new(&mut map);
        // A MARKER value we use for signaling that the child process isn't done yet.
        // Wait until the child thread has written all ptrs. We know this when it has written the
        // number of pointers to free to position `allocs_pos`, which now contains `MARKER`.
        let mut ptr_count = MARKER;
        while ptr_count == MARKER {
            cursor.set_position(0);
            ptr_count = cursor.read_u64::<NativeEndian>().unwrap();
        }

        let reachable = {
            let a = 8;
            let b = a + ptr_count as usize * 8;
            let s = cursor.get_ref()[a..b].as_ptr() as *const usize;
            assert_eq!(ptr_count as usize, (b - a) / 8);
            unsafe { std::slice::from_raw_parts(s, (b - a) / 8) }
        }.iter()
            .cloned()
            .collect::<HashSet<usize>>();

        {
            // This may block if some thread is in `SignalVec::Drop`, but that's OK, as no thread in
            // its signal handler have to wait for this thread. Thus, the dropping thread will
            // eventually release the lock, and we may continue.
            let mut map = ALLOCATED_ADDRS.lock().unwrap();
            // Remove all pointers from `map` that aren't reachable.
            map.retain(|&addr, _size| {
                let keep = reachable.contains(&addr);
                if !keep {
                    alloc::free(addr);
                }
                keep
            });

            // For all new allocations, free those that aren't reachable. Those that are reachable
            // are inserted into `map`.

            for &(addr, vtable) in &new_allocs {
                if ptr::tag(ptr::NullablePtr::<()>::new(addr)) != 0 {
                    panic!("checked new tagged thing!!");
                }
                if reachable.contains(&addr) {
                    map.insert(addr, vtable);
                } else {
                    alloc::free(addr);
                }
            }
        };

        // Wait for all threads in the signal handler to exit it, such that we know no thread is
        // messing up our system. Given fair scheduling, no thread should still be here, but if it
        // is, we give up execution to speed it up.
        while THREAD_COUNTERS[0].load(SeqCst) > 0 {
            ::std::thread::yield_now();
        }
        unsafe {
            // We have to run the destructors for the `Arc`s, so that SignalVecs gets dropped. This
            // is annoying to do manually. TODO: Look into just having the `Vec` on the stack here?
            // This could be fine, as long as we allocate it *before* signaling the threads.  This
            // way we'd get the cleanup for free, in the `Drop` call.
            let mut vecs = THREAD_DATAS.try_write().unwrap();
            vecs.set_len(thread_count);
            vecs.clear();

            let status: *mut i32 = &0 as *const i32 as *mut i32;
            libc::waitpid(child_pid, status, 0);
        };

        // Mark that we've finished one iteration
        CONSOLIDATE_ITERATION.fetch_add(1, SeqCst);
        if !CONSOLIDATE_LOCK.compare_and_swap(true, false, SeqCst) {
            panic!("We failed to CAS the consolidate lock back! This should never happen!!");
        };
        // );
    }
}

#[cfg(test)]
mod root_test {
    use super::alloc::without_reclamation;
    use super::*;
    use std::thread::spawn as stdspawn;

    fn spawn<F: Fn() + Send + 'static>(f: F) -> ::std::thread::JoinHandle<()> {
        alloc::without_reclamation(|| stdspawn(f))
    }

    #[cfg(feature = "sanitize")]
    fn is_valid_addr(addr: usize) -> bool {
        let a = alloc::ALLOCATIONS.lock().unwrap();
        a.contains(&addr)
    }

    #[cfg(not(feature = "sanitize"))]
    fn is_valid_addr(addr: usize) -> bool {
        println!("WARNING: testing without 'sanitize'");
        true
    }

    #[test]
    fn guard_protects() {
        let _t = ::test::test_init();
        guard!(g);
        let p: ptr::Ptr<usize> = alloc(g, 123);
        let addr: usize = ptr::raw(p);
        println!("alloc addr = {:x}", addr);
        assert!(is_valid_addr(addr));

        do_consolidate();

        assert!(is_valid_addr(addr));
        println!("{:?}", ptr::raw(p));
    }

    #[test]
    fn memory_is_reclaimed() {
        let _t = ::test::test_init();
        let addr: usize;
        {
            guard!(g);
            let p: ptr::Ptr<usize> = alloc(g, 123);
            addr = ptr::raw(p);
            assert!(is_valid_addr(addr));
        }

        do_consolidate();

        assert!(!is_valid_addr(addr));
    }

    #[test]
    fn consolidate_spam() {
        let _t = ::test::test_init();
        let handles = (0..20)
            .map(|_| {
                without_reclamation(|| {
                    spawn(|| {
                        let _t = ::test::test_init();
                        do_consolidate();
                    })
                })
            })
            .collect::<Vec<_>>();
        for h in handles.into_iter() {
            assert!(h.join().is_ok());
        }
    }

    #[test]
    fn attempt_global_root_deadlock() {
        use super::SharedGuard;
        let _t = ::test::test_init();
        let handles = (0..8)
            .map(|_| {
                without_reclamation(|| {
                    spawn(|| unsafe {
                        let _t = ::test::test_init();
                        for _ in 0..10 {
                            let mut sh: SharedGuard<usize> = SharedGuard::new();
                            sh.register();
                            do_consolidate();
                        }
                    })
                })
            })
            .collect::<Vec<_>>();
        for h in handles.into_iter() {
            assert!(h.join().is_ok());
        }
    }
}

mod bench {
    extern crate test;
    use bench::test::{black_box, Bencher};

    use super::*;

    #[bench]
    fn cmr_guard_1k(b: &mut Bencher) {
        global_init();
        let _t = ::test::test_init();
        b.iter(|| for _ in 0..1000 {
            guard!(g);
            let _: &mut Guard<u64> = g;
        });
    }

    #[bench]
    fn cmr_10_guards_1k_slow(b: &mut Bencher) {
        global_init();
        let _t = ::test::test_init();
        b.iter(|| for _ in 0..1000 {
            guard!(g0);
            guard!(g1);
            guard!(g2);
            guard!(g3);
            guard!(g4);
            guard!(g5);
            guard!(g6);
            guard!(g7);
            guard!(g8);
            guard!(g9);
            let _: &mut Guard<u64> = g0;
            let _: &mut Guard<u64> = g1;
            let _: &mut Guard<u64> = g2;
            let _: &mut Guard<u64> = g3;
            let _: &mut Guard<u64> = g4;
            let _: &mut Guard<u64> = g5;
            let _: &mut Guard<u64> = g6;
            let _: &mut Guard<u64> = g7;
            let _: &mut Guard<u64> = g8;
            let _: &mut Guard<u64> = g9;
        });
    }


    #[bench]
    fn cmr_10_guards_1k(b: &mut Bencher) {
        global_init();
        let _t = ::test::test_init();
        b.iter(|| for _ in 0..1000 {
            guards!(g0, g1, g2, g3, g4, g5, g6, g7, g8, g9);
            let _: &mut Guard<u64> = g0;
            let _: &mut Guard<u64> = g1;
            let _: &mut Guard<u64> = g2;
            let _: &mut Guard<u64> = g3;
            let _: &mut Guard<u64> = g4;
            let _: &mut Guard<u64> = g5;
            let _: &mut Guard<u64> = g6;
            let _: &mut Guard<u64> = g7;
            let _: &mut Guard<u64> = g8;
            let _: &mut Guard<u64> = g9;
        });
    }

    #[bench]
    fn cmr_alloc_1k(b: &mut Bencher) {
        #[cfg(not(feature = "disable-reclamation"))]
        {
            panic!("Should not run cmr_alloc_1k without disabling reclamation");
        }
        global_init();
        let _t = ::test::test_init();
        guard!(g);
        b.iter(|| for _ in 0..1000 {
            let p = alloc(g, 1);
            black_box(p);
        });
    }

    #[bench]
    fn raw_alloc_1k(b: &mut Bencher) {
        b.iter(|| for _ in 0..1000 {
            let r = std::boxed::Box::into_raw(std::boxed::Box::new(0));

            black_box(r);
        });
    }
}