diff --git a/README.md b/README.md index a30f073..238ffef 100644 --- a/README.md +++ b/README.md @@ -6,8 +6,7 @@ A concurrent, append-only vector. -The vector provided by this crate suports concurrent `get` and `push` operations. -Reads are always lock-free, as are writes except when resizing is required. +The vector provided by this crate suports concurrent `get` and `push` operations. All operations are lock-free. # Examples @@ -74,13 +73,3 @@ fn main() { assert_eq!(*x, 2); } ``` - -# Performance - -Below is a benchmark in which an increasing number of elements are pushed and read from the vector -by 12 threads, comparing `boxcar::Vec` to `RwLock`: - -Benchmark - -The results show that `boxcar::Vec` scales very well under load, performing significantly better -than lock-based solutions. diff --git a/src/raw.rs b/src/raw.rs index 0294c04..820630b 100644 --- a/src/raw.rs +++ b/src/raw.rs @@ -1,23 +1,23 @@ use std::cell::UnsafeCell; use std::mem::{self, MaybeUninit}; use std::ops::Index; -use std::sync::atomic::{AtomicBool, AtomicPtr, AtomicUsize, Ordering}; -use std::sync::Mutex; +use std::sync::atomic::{AtomicBool, AtomicPtr, AtomicU64, AtomicUsize, Ordering}; use std::{ptr, slice}; -const BUCKETS: usize = (usize::BITS + 1) as _; -const MAX_ENTRIES: usize = usize::MAX; +const BUCKETS: usize = (usize::BITS as usize) - SKIP_BUCKET; +const MAX_ENTRIES: usize = usize::MAX - SKIP; // A lock-free, append-only vector. pub struct Vec { - // buckets of length 1, 1, 2, 4, 8 .. 2^63 - buckets: [Bucket; BUCKETS], - // the number of elements in this vector - count: AtomicUsize, // a counter used to retrieve a unique index to push to. + // // this value may be more than the true length as it will // be incremented before values are actually stored. - inflight: AtomicUsize, + inflight: AtomicU64, + // buckets of length 32, 64 .. 2^63 + buckets: [Bucket; BUCKETS], + // the number of initialized elements in this vector + count: AtomicUsize, } unsafe impl Send for Vec {} @@ -40,105 +40,12 @@ impl Vec { } Vec { - buckets: buckets.map(Bucket::from_raw), - inflight: AtomicUsize::new(0), + buckets: buckets.map(Bucket::new), + inflight: AtomicU64::new(0), count: AtomicUsize::new(0), } } - // Reserves capacity for at least `additional` more elements to be inserted - // in the given `Vec`. The collection may reserve more space to avoid - // frequent reallocations. - pub fn reserve(&self, additional: usize) { - let len = self.count.load(Ordering::Acquire); - let location = Location::of(len.checked_add(additional).unwrap_or(MAX_ENTRIES)); - - let mut bucket_index = location.bucket; - let mut bucket_len = location.bucket_len; - - // allocate buckets starting from the bucket at `len + additional` and - // working our way backwards - loop { - // SAFETY: we have enough buckets for `usize::MAX` entries - let bucket = unsafe { self.buckets.get_unchecked(bucket_index) }; - - // reached an initalized bucket, we're done - if !bucket.entries.load(Ordering::Acquire).is_null() { - break; - } - - // guard against concurrent allocations - let _allocating = bucket.lock.lock().unwrap(); - - // someone allocated before us - if !bucket.entries.load(Ordering::Relaxed).is_null() { - break; - } - - // otherwise, allocate the bucket - let new_entries = Bucket::alloc(bucket_len); - bucket.entries.store(new_entries, Ordering::Release); - - if bucket_index == 0 { - break; - } - - bucket_index -= 1; - bucket_len = Location::bucket_len(bucket_index); - } - } - - // Appends an element to the back of the vector. - pub fn push(&self, value: T) -> usize { - let index = self.inflight.fetch_add(1, Ordering::Relaxed); - let location = Location::of(index); - - // SAFETY: we have enough buckets for usize::MAX entries. - // we assume that `inflight` cannot realistically overflow. - let bucket = unsafe { self.buckets.get_unchecked(location.bucket) }; - let mut entries = bucket.entries.load(Ordering::Acquire); - - // the bucket has not been allocated yet - if entries.is_null() { - // guard against concurrent allocations - let _allocating = bucket.lock.lock().unwrap(); - - let new_entries = bucket.entries.load(Ordering::Acquire); - if !new_entries.is_null() { - // someone allocated before us - entries = new_entries; - } else { - // otherwise allocate the bucket - let alloc = Bucket::alloc(location.bucket_len); - bucket.entries.store(alloc, Ordering::Release); - entries = alloc; - } - } - - unsafe { - // SAFETY: `location.entry` is always in bounds for `location.bucket` - let entry = &*entries.add(location.entry); - - // SAFETY: we have unique access to this entry. - // - // 1. it is impossible for another thread to attempt - // a `push` to this location as we retreived it with - // a `inflight.fetch_add`. - // - // 2. any thread trying to `get` this entry will see - // `active == false`, and will not try to access it - entry.slot.get().write(MaybeUninit::new(value)); - - // let other threads know that this slot - // is active - entry.active.store(true, Ordering::Release); - } - - self.count.fetch_add(1, Ordering::Release); - - index - } - // Returns the number of elements in the vector. pub fn count(&self) -> usize { self.count.load(Ordering::Acquire) @@ -148,7 +55,7 @@ impl Vec { pub fn get(&self, index: usize) -> Option<&T> { let location = Location::of(index); - // SAFETY: we have enough buckets for `usize::MAX` entries + // safety: `location.bucket` is always in bounds let entries = unsafe { self.buckets .get_unchecked(location.bucket) @@ -161,11 +68,11 @@ impl Vec { return None; } - // SAFETY: `location.entry` is always in bounds for `location.bucket` + // safety: `location.entry` is always in bounds for it's bucket let entry = unsafe { &*entries.add(location.entry) }; if entry.active.load(Ordering::Acquire) { - // SAFETY: the entry is active + // safety: the entry is active unsafe { return Some(entry.value_unchecked()) } } @@ -181,8 +88,7 @@ impl Vec { pub unsafe fn get_unchecked(&self, index: usize) -> &T { let location = Location::of(index); - // SAFETY: caller guarantees the index is in bounds and - // the entry is present. + // safety: caller guarantees the entry is initialized unsafe { let entry = self .buckets @@ -199,7 +105,7 @@ impl Vec { pub fn get_mut(&mut self, index: usize) -> Option<&mut T> { let location = Location::of(index); - // SAFETY: we have enough buckets for `usize::MAX` entries + // safety: `location.bucket` is always in bounds let entries = unsafe { self.buckets .get_unchecked_mut(location.bucket) @@ -212,11 +118,11 @@ impl Vec { return None; } - // SAFETY: `location.entry` is always in bounds for `location.bucket` + // safety: `location.entry` is always in bounds for it's bucket let entry = unsafe { &mut *entries.add(location.entry) }; if *entry.active.get_mut() { - // SAFETY: the entry is active + // safety: the entry is active unsafe { return Some(entry.value_unchecked_mut()) } } @@ -232,8 +138,7 @@ impl Vec { pub unsafe fn get_unchecked_mut(&mut self, index: usize) -> &mut T { let location = Location::of(index); - // SAFETY: caller guarantees index is in bounds and - // entry is present. + // safety: caller guarantees the entry is initialized unsafe { let entry = self .buckets @@ -246,6 +151,104 @@ impl Vec { } } + // Appends an element to the back of the vector. + pub fn push(&self, value: T) -> usize { + let index = self.inflight.fetch_add(1, Ordering::Relaxed); + // the inflight counter is a `u64` to catch overflows of the vector'scapacity + let index: usize = index.try_into().expect("overflowed maximum capacity"); + let location = Location::of(index); + + // eagerly allocate the next bucket if we are close to the end of this one + if index == (location.bucket_len - (location.bucket_len >> 3)) { + if let Some(next_bucket) = self.buckets.get(location.bucket + 1) { + Vec::get_or_alloc(next_bucket, location.bucket_len << 1); + } + } + + // safety: `location.bucket` is always in bounds + let bucket = unsafe { self.buckets.get_unchecked(location.bucket) }; + let mut entries = bucket.entries.load(Ordering::Acquire); + + // the bucket has not been allocated yet + if entries.is_null() { + entries = Vec::get_or_alloc(bucket, location.bucket_len); + } + + unsafe { + // safety: `location.entry` is always in bounds for it's bucket + let entry = &*entries.add(location.entry); + + // safety: we have unique access to this entry. + // + // 1. it is impossible for another thread to attempt a `push` + // to this location as we retreived it from `inflight.fetch_add` + // + // 2. any thread trying to `get` this entry will see `active == false`, + // and will not try to access it + entry.slot.get().write(MaybeUninit::new(value)); + + // let other threads know that this entry is active + entry.active.store(true, Ordering::Release); + } + + // increase the true count + self.count.fetch_add(1, Ordering::Release); + index + } + + // race to intialize a bucket + fn get_or_alloc(bucket: &Bucket, len: usize) -> *mut Entry { + let entries = Bucket::alloc(len); + match bucket.entries.compare_exchange( + ptr::null_mut(), + entries, + Ordering::Release, + Ordering::Acquire, + ) { + Ok(_) => entries, + Err(found) => unsafe { + Bucket::dealloc(entries, len); + found + }, + } + } + + // Reserves capacity for at least `additional` more elements to be inserted + // in the given `Vec`. The collection may reserve more space to avoid + // frequent reallocations. + pub fn reserve(&self, additional: usize) { + let len = self.count.load(Ordering::Acquire); + let mut location = Location::of(len.checked_add(additional).unwrap_or(MAX_ENTRIES)); + + // allocate buckets starting from the bucket at `len + additional` and + // working our way backwards + loop { + // safety: `location.bucket` is always in bounds + let bucket = unsafe { self.buckets.get_unchecked(location.bucket) }; + + // reached an initalized bucket, we're done + if !bucket.entries.load(Ordering::Acquire).is_null() { + break; + } + + // someone allocated before us + if !bucket.entries.load(Ordering::Relaxed).is_null() { + break; + } + + // allocate the bucket + Vec::get_or_alloc(bucket, location.bucket_len); + + // reached the first bucket + if location.bucket == 0 { + break; + } + + location.bucket -= 1; + location.bucket_len = Location::bucket_len(location.bucket); + } + } + // Returns an iterator over the vector. pub fn iter(&self) -> Iter { Iter { @@ -253,8 +256,8 @@ impl Vec { yielded: 0, location: Location { bucket: 0, - bucket_len: 1, entry: 0, + bucket_len: Location::bucket_len(0), }, } } @@ -278,8 +281,8 @@ impl Drop for Vec { } let len = Location::bucket_len(i); - // SAFETY: we have &mut self - unsafe { drop(Box::from_raw(slice::from_raw_parts_mut(entries, len))) } + // safety: in drop + unsafe { Bucket::dealloc(entries, len) } } } } @@ -300,7 +303,7 @@ impl Iter { // being stored in a bucket that we have already iterated over, so we // still have to check that we are in bounds while self.location.bucket < BUCKETS { - // SAFETY: bounds checked above + // safety: bounds checked above let entries = unsafe { vec.buckets .get_unchecked(self.location.bucket) @@ -315,9 +318,10 @@ impl Iter { // `vec.count()` elements if !entries.is_null() { while self.location.entry < self.location.bucket_len { - // SAFETY: bounds checked above + // safety: bounds checked above let entry = unsafe { &*entries.add(self.location.entry) }; let index = self.index; + self.location.entry += 1; self.index += 1; @@ -332,7 +336,10 @@ impl Iter { self.location.entry = 0; self.location.bucket += 1; - self.location.bucket_len = Location::bucket_len(self.location.bucket); + + if self.location.bucket < BUCKETS { + self.location.bucket_len = Location::bucket_len(self.location.bucket); + } } None @@ -346,7 +353,7 @@ impl Iter { pub unsafe fn next_owned(&mut self, vec: &mut Vec) -> Option { self.next(vec).map(|(_, entry)| unsafe { entry.active.store(false, Ordering::Relaxed); - // SAFETY: `next` only yields initialized entries + // safety: `next` only yields initialized entries let value = mem::replace(&mut *entry.slot.get(), MaybeUninit::uninit()); value.assume_init() }) @@ -358,7 +365,6 @@ impl Iter { } struct Bucket { - lock: Mutex<()>, entries: AtomicPtr>, } @@ -379,18 +385,13 @@ impl Bucket { Box::into_raw(entries) as _ } - fn from_raw(entries: *mut Entry) -> Bucket { - Bucket { - lock: Mutex::new(()), - entries: AtomicPtr::new(entries), - } + unsafe fn dealloc(entries: *mut Entry, len: usize) { + unsafe { drop(Box::from_raw(slice::from_raw_parts_mut(entries, len))) } } -} -impl Drop for Entry { - fn drop(&mut self) { - if *self.active.get_mut() { - unsafe { ptr::drop_in_place((*self.slot.get()).as_mut_ptr()) } + fn new(entries: *mut Entry) -> Bucket { + Bucket { + entries: AtomicPtr::new(entries), } } } @@ -400,7 +401,7 @@ impl Entry { // // Value must be initialized. unsafe fn value_unchecked(&self) -> &T { - // SAFETY: guaranteed by caller + // safety: guaranteed by caller unsafe { (*self.slot.get()).assume_init_ref() } } @@ -408,11 +409,19 @@ impl Entry { // // Value must be initialized. unsafe fn value_unchecked_mut(&mut self) -> &mut T { - // SAFETY: guaranteed by caller + // safety: guaranteed by caller unsafe { self.slot.get_mut().assume_init_mut() } } } +impl Drop for Entry { + fn drop(&mut self) { + if *self.active.get_mut() { + unsafe { ptr::drop_in_place((*self.slot.get()).as_mut_ptr()) } + } + } +} + #[derive(Debug)] struct Location { // the index of the bucket @@ -423,11 +432,18 @@ struct Location { entry: usize, } +// skip the shorter buckets to avoid unnecessary allocations. +// this also reduces the maximum capacity of a vector. +const SKIP: usize = 32; +const SKIP_BUCKET: usize = ((usize::BITS - SKIP.leading_zeros()) as usize) - 1; + impl Location { fn of(index: usize) -> Location { - let bucket = (usize::BITS - index.leading_zeros()) as usize; + let skipped = index.checked_add(SKIP).expect("exceeded maximum length"); + let bucket = usize::BITS - skipped.leading_zeros(); + let bucket = (bucket as usize) - (SKIP_BUCKET + 1); let bucket_len = Location::bucket_len(bucket); - let entry = if index == 0 { 0 } else { index ^ bucket_len }; + let entry = skipped ^ bucket_len; Location { bucket, @@ -437,7 +453,7 @@ impl Location { } fn bucket_len(bucket: usize) -> usize { - 1 << bucket.saturating_sub(1) + 1 << (bucket + SKIP_BUCKET) } } @@ -447,17 +463,33 @@ mod tests { #[test] fn location() { - let min = Location::of(0); - assert_eq!(min.bucket, 0); + assert_eq!(Location::bucket_len(0), 32); + for i in 0..32 { + let loc = Location::of(i); + assert_eq!(loc.bucket_len, 32); + assert_eq!(loc.bucket, 0); + assert_eq!(loc.entry, i); + } + + assert_eq!(Location::bucket_len(1), 64); + for i in 33..96 { + let loc = Location::of(i); + assert_eq!(loc.bucket_len, 64); + assert_eq!(loc.bucket, 1); + assert_eq!(loc.entry, i - 32); + } + + assert_eq!(Location::bucket_len(2), 128); + for i in 96..224 { + let loc = Location::of(i); + assert_eq!(loc.bucket_len, 128); + assert_eq!(loc.bucket, 2); + assert_eq!(loc.entry, i - 96); + } let max = Location::of(MAX_ENTRIES); assert_eq!(max.bucket, BUCKETS - 1); - } - - #[test] - fn iterator() { - let vec = (0..100).collect::>(); - vec.iter().for_each(|(a, &b)| assert_eq!(a, b)); - vec.iter().for_each(|(a, &b)| assert_eq!(vec[a], b)); + assert_eq!(max.bucket_len, 1 << 63); + assert_eq!(max.entry, (1 << 63) - 1); } } diff --git a/tests/vec.rs b/tests/vec.rs new file mode 100644 index 0000000..5029fe5 --- /dev/null +++ b/tests/vec.rs @@ -0,0 +1,80 @@ +use std::{sync::Barrier, thread}; + +#[test] +fn simple() { + let vec = boxcar::vec![0, 1, 2]; + for x in 3..1000 { + let i = vec.push(x); + assert_eq!(vec[i], x); + } + + for i in 0..1000 { + assert_eq!(vec[i], i); + } + + for (i, &x) in vec.iter() { + assert_eq!(i, x); + } + + for (i, x) in vec.into_iter().enumerate() { + assert_eq!(i, x); + } +} + +#[test] +fn stress() { + let vec = boxcar::Vec::new(); + let barrier = Barrier::new(6); + + thread::scope(|s| { + s.spawn(|| { + barrier.wait(); + for i in 0..1000 { + vec.push(i); + } + }); + + s.spawn(|| { + barrier.wait(); + for i in 1000..2000 { + vec.push(i); + } + }); + + s.spawn(|| { + barrier.wait(); + for i in 2000..3000 { + vec.push(i); + } + }); + + s.spawn(|| { + barrier.wait(); + for i in 3000..4000 { + vec.push(i); + } + }); + + s.spawn(|| { + barrier.wait(); + for i in 0..10_000 { + if let Some(&x) = vec.get(i) { + assert!(x < 4000); + } + } + }); + + s.spawn(|| { + barrier.wait(); + for (i, &x) in vec.iter() { + assert!(x < 4000); + assert!(vec[i] < 4000); + } + }); + }); + + assert_eq!(vec.count(), 4000); + let mut sorted = vec.into_iter().collect::>(); + sorted.sort(); + assert_eq!(sorted, (0..4000).collect::>()); +}