From aa55039161736c88b103a520f83ce8f8ab66288f Mon Sep 17 00:00:00 2001 From: tower120 Date: Wed, 20 Oct 2021 16:55:09 +0300 Subject: [PATCH] start_position fix for out-of-order cleanup * miri spmc tests * start_position fix for out-of-order cleanup * total_capacity now O(1) * truncate_front logical fix * changelog update * on_new_chunk_cleanup lock fix --- .github/workflows/ci.yml | 2 +- CHANGELOG.md | 1 + Readme.md | 16 +++-- doc/principle-of-operation.md | 6 +- src/event_queue.rs | 130 +++++++++++++++++++--------------- src/event_queue/test.rs | 42 ++++++++++- src/event_reader.rs | 66 ++++++++--------- src/mpmc/event_queue.rs | 4 +- src/mpmc/event_reader.rs | 2 +- src/spmc/event_queue.rs | 2 +- src/tests/mpmc.rs | 77 ++++++++++++++++++-- src/tests/spmc.rs | 6 +- 12 files changed, 241 insertions(+), 113 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 5ddbd10..9dff3aa 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -54,7 +54,7 @@ jobs: steps: - uses: actions/checkout@v2 - name: Build doc - run: cargo doc --lib + run: RUSTFLAGS="--deny warnings" cargo doc --lib loom: runs-on: ubuntu-latest diff --git a/CHANGELOG.md b/CHANGELOG.md index 6f96e4f..b7abcbd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ - clear/truncate_front now dispose chunks not occupied by readers immediately! Which, at least partially, solves "emergency cleanup" problem. Now you don't have to have access to all readers! - Subscribe/unsubscribe now O(1). +- EventQueue::total_capacity now O(1). ## 0.4.1 ### Added diff --git a/Readme.md b/Readme.md index e73b609..0e94e86 100644 --- a/Readme.md +++ b/Readme.md @@ -28,9 +28,11 @@ Read - per thread performance degrades slowly, with each additional simultaneous _(Also remember, since `rc_event_queue` is message queue, and each reader read ALL queue - adding more readers does not consume queue faster)_ -Write - per thread performance degrades close to linearly, with each additional simultaneously writing thread. +Write - per thread performance degrades almost linearly, with each additional simultaneously writing thread. (Due to being locked). Not applicable to `spmc`. +N.B. But if there is no heavy contention - performance very close to single-threaded case. + [See mpmc benchmarks](doc/mpmc_benchmarks.md). ### Principle of operation @@ -93,7 +95,7 @@ assert!(sum(reader2.iter()) == 1100); ### Emergency cut If any of the readers did not read for a long time - it can retain queue from cleanup. -This means that queue capacity will grow. On long run systems, you may want to periodically check `total_capacity`, +This means that queue capacity will grow. On long runs with unpredictable systems, you may want to periodically check `total_capacity`, and if it grows too much - you may want to force-cut/clear it. ```rust @@ -104,18 +106,20 @@ if event.total_capacity() > 100000{ event.truncate_front(1000); // leave some of the latest messages to read // If you set to Settings::MAX_CHUNK_SIZE to high value, - // This will reduce chunk size on next writes. + // this will reduce chunk size. event.change_chunk_size(2048); - // If you do have access to all readers - this will move readers forward, - // and free the rest of the chunks. + // If you DO have access to all readers (you probably don't) - + // this will move readers forward, and free the chunks occupied by readers. + // Under normal conditions, this is not necessary, since readers will jump + // forward to another chunk immediately on the next iter() call. for reader in readers{ reader.update_position(); // reader.iter(); // this have same effect as above } } - ``` +Even if some reader will stop read forever - you'll only lose/leak chunk directly occupied by reader. ### Optimisation diff --git a/doc/principle-of-operation.md b/doc/principle-of-operation.md index 29776bb..e9d2c5b 100644 --- a/doc/principle-of-operation.md +++ b/doc/principle-of-operation.md @@ -6,7 +6,7 @@ Writes happens under lock, and does not block reads. Single-threaded read performance is between `Vec` and `VecDeque` for best-case scenario; 1.5-2x slower then `VecDeque` - for worse. -Single-threaded write performance 4-5x slower then `VecDeque`. But writing with `EventQueue::extend` can give you `VecDeque`-like performance. +Single-threaded write performance 2-3x slower then `VecDeque`. But writing with `EventQueue::extend` can give you `VecDeque`-like performance. Memory-wise there is only fixed overhead. Each reader is just kind a pointer. @@ -128,7 +128,7 @@ If we have only one chunk left, and previous chunk had the same size - we found With `feature="double_buffering"` enabled, the biggest freed chunk will be stored for further reuse. -## Tracking readers. Out-of-order chunks dispose. +## Tracking readers. Out-of-order chunks disposal. ![](images/tracked_chunks.png) @@ -153,6 +153,8 @@ On the EventReader side, during chunk switch: - Out+=1, Next.In+=1 - Release mutex +Queue remains lockless up to the clear/truncate call, due to read lock. + ## Optimisation techniques _TODO: AUTO_CLEANUP=false_ diff --git a/src/event_queue.rs b/src/event_queue.rs index 29716a1..2e9c9a2 100644 --- a/src/event_queue.rs +++ b/src/event_queue.rs @@ -19,6 +19,7 @@ use crate::dynamic_chunk::{DynamicChunkRecycled}; use crate::{StartPositionEpoch}; /// This way you can control when chunk's memory deallocation happens. +/// _In addition, some operations may cause deallocations as well._ #[derive(PartialEq)] pub enum CleanupMode{ /// Cleanup will be called when chunk fully read. @@ -29,7 +30,7 @@ pub enum CleanupMode{ OnChunkRead, /// Cleanup will be called when new chunk created. OnNewChunk, - /// Cleanup will never be called. You should call [EventQueue::cleanup] manually. + /// Cleanup will never be called. You should call `EventQueue::cleanup` manually. Never } @@ -49,8 +50,9 @@ pub struct List{ first: *mut DynamicChunk, last : *mut DynamicChunk, chunk_id_counter: usize, + total_capacity: usize, - readers_count: usize, + readers_count: u32, /// 0 - means no penult penult_chunk_size: u32, @@ -65,7 +67,9 @@ pub struct EventQueue{ /// Separate lock from list::start_position_epoch, is safe, because start_point_epoch encoded in /// chunk's atomic len+epoch. - pub(crate) start_position: SpinMutex>, + // TODO: Make RWLock? Bench. + // TODO: Optioned + pub(crate) start_position: SpinMutex>>, _pinned: PhantomPinned, } @@ -84,12 +88,13 @@ impl EventQueue last: null_mut(), chunk_id_counter: 0, readers_count:0, + total_capacity:new_capacity as usize, penult_chunk_size : 0, #[cfg(feature = "double_buffering")] free_chunk: None, }), - start_position: SpinMutex::new(Cursor{chunk: null(), index:0}), + start_position: SpinMutex::new(None), _pinned: PhantomPinned, }); @@ -100,7 +105,6 @@ impl EventQueue let event = &mut *(&*this as *const _ as *mut EventQueue); event.list.get_mut().first = node; event.list.get_mut().last = node; - event.start_position.get_mut().chunk = node; } this @@ -150,6 +154,7 @@ impl EventQueue node.set_next(new_node, Ordering::Release); list.last = new_node; list.penult_chunk_size = node.capacity() as u32; + list.total_capacity += size; unsafe{&mut *new_node} } @@ -159,8 +164,8 @@ impl EventQueue if S::CLEANUP == CleanupMode::OnNewChunk{ // this should acts as compile-time-if. if S::LOCK_ON_NEW_CHUNK_CLEANUP{ - // `cleanup` - locks internally - self.cleanup(); + let _lock = self.list.lock(); + self.cleanup_impl(list); } else { self.cleanup_impl(list); } @@ -283,7 +288,23 @@ impl EventQueue } } - unsafe fn free_chunk(chunk: *mut DynamicChunk, list: &mut List){ + unsafe fn free_chunk( + &self, + chunk: *mut DynamicChunk, + list: &mut List) + { + if let Some(start_position) = *self.start_position.as_mut_ptr(){ + if start_position.chunk == chunk{ + if LOCK_ON_WRITE_START_POSITION{ + *self.start_position.lock() = None; + } else { + *self.start_position.as_mut_ptr() = None; + } + } + } + + list.total_capacity -= (*chunk).capacity(); + #[cfg(not(feature = "double_buffering"))] { DynamicChunk::destruct(chunk); @@ -327,7 +348,9 @@ impl EventQueue debug_assert!(!next_chunk_ptr.is_null()); debug_assert!(std::ptr::eq(chunk, list.first)); - Self::free_chunk(chunk, list); + // Do not lock start_position permanently, because reader will + // never enter chunk before list.first + self.free_chunk::(chunk, list); list.first = next_chunk_ptr; Continue(()) @@ -339,19 +362,22 @@ impl EventQueue } } - /// This will traverse up to the start_point. And will do out-of-order cleanup. + /// This will traverse up to the start_point - and will free all unoccupied chunks. (out-of-order cleanup) /// This one slower then cleanup_impl. fn force_cleanup_impl(&self, list: &mut List){ self.cleanup_impl(list); - unsafe { - let terminal_chunk = (*self.start_position.as_mut_ptr()).chunk; - if terminal_chunk.is_null(){ - return; - } - if (*list.first).id() >= (*terminal_chunk).id(){ - return; - } + // Lock start_position permanently, due to out of order chunk destruction. + // Reader can try enter in the chunk in the middle of force_cleanup execution. + let start_position = self.start_position.lock(); + let terminal_chunk = match &*start_position{ + None => { return; } + Some(cursor) => {cursor.chunk} + }; + if list.first as *const _ == terminal_chunk{ + return; + } + unsafe { // cleanup_impl dealt with first chunk before. Omit. let mut prev_chunk = list.first; // using _ptr version, because with &chunk - reference should be valid during whole @@ -378,7 +404,7 @@ impl EventQueue (*prev_chunk).set_next(next_chunk_ptr, Ordering::Release); drop(lock); - Self::free_chunk(chunk, list); + self.free_chunk::(chunk, list); Continue(()) } ); @@ -395,7 +421,7 @@ impl EventQueue list: &mut List, new_start_position: Cursor) { - *self.start_position.lock() = new_start_position; + *self.start_position.lock() = Some(new_start_position); // update len_and_start_position_epoch in each chunk let first_chunk = unsafe{&mut *list.first}; @@ -415,11 +441,11 @@ impl EventQueue pub fn clear(&self, list: &mut List){ let last_chunk = unsafe{ &*list.last }; - let chunk_len = last_chunk.chunk_state(Ordering::Relaxed).len() as usize; + let last_chunk_len = last_chunk.chunk_state(Ordering::Relaxed).len() as usize; self.set_start_position(list, Cursor { chunk: last_chunk, - index: chunk_len + index: last_chunk_len }); self.force_cleanup_impl(list); @@ -427,29 +453,27 @@ impl EventQueue pub fn truncate_front(&self, list: &mut List, len: usize) { // make chunks* array - let chunks_count= unsafe { - list.chunk_id_counter/*(*list.last).id*/ - (*list.first).id() + 1 - }; // TODO: subtract from total_capacity // TODO: use small_vec // TODO: loop if > 128? // there is no way we can have memory enough to hold > 2^64 bytes. - debug_assert!(chunks_count<=128); let mut chunks : [*const DynamicChunk; 128] = [null(); 128]; - unsafe { - let mut i = 0; - foreach_chunk( - list.first, - null(), - Ordering::Relaxed, // we're under mutex - |chunk| { - chunks[i] = chunk; - i+=1; - Continue(()) - } - ); - } + let chunks_count= + unsafe { + let mut i = 0; + foreach_chunk( + list.first, + null(), + Ordering::Relaxed, // we're under mutex + |chunk| { + chunks[i] = chunk; + i+=1; + Continue(()) + } + ); + i + }; let mut total_len = 0; for i in (0..chunks_count).rev(){ @@ -457,10 +481,18 @@ impl EventQueue let chunk_len = chunk.chunk_state(Ordering::Relaxed).len() as usize; total_len += chunk_len; if total_len >= len{ - self.set_start_position(list, Cursor { + let new_start_position = Cursor { chunk: chunks[i], index: total_len - len - }); + }; + // Do we actually need to truncate? + if let Some(start_position) = unsafe{*self.start_position.as_mut_ptr()}{ + if start_position >= new_start_position{ + return; + } + } + + self.set_start_position(list, new_start_position); self.force_cleanup_impl(list); return; } @@ -476,22 +508,8 @@ impl EventQueue self.add_chunk_sized(&mut *list, new_capacity as usize); } - /// O(n) - /// TODO: store current capacity pub fn total_capacity(&self, list: &List) -> usize { - let mut total = 0; - unsafe { - foreach_chunk( - list.first, - null(), - Ordering::Relaxed, // we're under mutex - |chunk| { - total += chunk.capacity(); - Continue(()) - } - ); - } - total + list.total_capacity } pub fn chunk_capacity(&self, list: &List) -> usize { diff --git a/src/event_queue/test.rs b/src/event_queue/test.rs index cd8ec9d..434978e 100644 --- a/src/event_queue/test.rs +++ b/src/event_queue/test.rs @@ -1,9 +1,10 @@ use crate::mpmc::{EventQueue, EventReader, Settings, DefaultSettings}; -use crate::CleanupMode; +use crate::{CleanupMode, LendingIterator}; use std::ptr::null; use std::ops::ControlFlow::Continue; use std::ops::Deref; use itertools::assert_equal; +use rand::Rng; use crate::event_queue::{foreach_chunk, List}; use crate::sync::Ordering; use crate::tests::utils::{consume_copies, skip}; @@ -40,6 +41,23 @@ fn get_chunks_lens(event_queue: &EventQueue) -> Vec chunk_lens } +fn factual_capacity(event_queue: &EventQueue) -> usize { + let list = &event_queue.0.list.lock(); + let mut total = 0; + unsafe { + foreach_chunk( + list.first, + null(), + Ordering::Relaxed, // we're under mutex + |chunk| { + total += chunk.capacity(); + Continue(()) + } + ); + } + total +} + #[test] fn chunks_size_test(){ @@ -160,6 +178,28 @@ fn capacity_test(){ assert_eq!(event.total_capacity(), get_chunks_capacities(&event).iter().sum()); } +#[test] +fn fuzzy_capacity_size_test(){ + use rand::Rng; + let mut rng = rand::thread_rng(); + let size_bound = if cfg!(miri){ 1000 } else { 100000 }; + let read_bound = if cfg!(miri){ 100 } else { 10000 }; + for _ in 0..100{ + let size = rng.gen_range(0..size_bound); + let event = EventQueue::::new(); + let mut reader = EventReader::new(&event); + event.extend(0..size); + { + let mut iter = reader.iter(); + for _ in 0..rng.gen_range(0..read_bound){ + iter.next(); + } + } + + assert_eq!(event.total_capacity(), factual_capacity(&event)); + } +} + #[test] #[allow(non_snake_case)] fn CleanupMode_OnNewChunk_test(){ diff --git a/src/event_reader.rs b/src/event_reader.rs index 5bcb436..f90fcc2 100644 --- a/src/event_reader.rs +++ b/src/event_reader.rs @@ -21,47 +21,43 @@ unsafe impl Send for EventReader{} impl EventReader { - #[inline] - fn fast_forward( - &mut self, - new_position: Cursor, - try_cleanup: bool /*should be generic const*/ - ){ - // 1. Enter new_position chunk - let new_chunk = unsafe{&*new_position.chunk}; - new_chunk.readers_entered().fetch_add(1, Ordering::AcqRel); - - // 2. Mark current chunk read - let chunk = unsafe{&*self.position.chunk}; - if /*constexpr*/ try_cleanup { - let event = chunk.event(); - let readers_entered = chunk.readers_entered().load(Ordering::Acquire); - - // MORE or equal, just in case (this MT...). This check is somewhat opportunistic. - let prev_read = chunk.read_completely_times().fetch_add(1, Ordering::AcqRel); - if prev_read+1 >= readers_entered{ - event.cleanup(); - } - } else { - chunk.read_completely_times().fetch_add(1, Ordering::AcqRel); - } - - // 3. Change position - self.position = new_position; - } - // Have much better performance being non-inline. Occurs rarely. // This is the only reason this code - is a function. #[inline(never)] #[cold] fn do_update_start_position_and_get_chunk_state(&mut self) -> PackedChunkState { let event = unsafe{(*self.position.chunk).event()}; - let new_start_position = event.start_position.lock().clone(); - if self.position < new_start_position { - self.fast_forward( - new_start_position, - S::CLEANUP == CleanupMode::OnChunkRead - ); + + // fast forward + { + let start_position_lock = event.start_position.lock(); + if let Some(start_position) = *start_position_lock{ + if self.position < start_position { + + // 1. Enter new_position chunk + let new_chunk = unsafe{&*start_position.chunk}; + new_chunk.readers_entered().fetch_add(1, Ordering::AcqRel); + + // 2. Mark current chunk read + let chunk = unsafe{&*self.position.chunk}; + if /*constexpr*/ S::CLEANUP == CleanupMode::OnChunkRead { + let event = chunk.event(); + let readers_entered = chunk.readers_entered().load(Ordering::Acquire); + + // MORE or equal, just in case (this MT...). This check is somewhat opportunistic. + let prev_read = chunk.read_completely_times().fetch_add(1, Ordering::AcqRel); + if prev_read+1 >= readers_entered{ + drop(start_position_lock); + event.cleanup(); + } + } else { + chunk.read_completely_times().fetch_add(1, Ordering::AcqRel); + } + + // 3. Change position + self.position = start_position; + } + } } unsafe{&*self.position.chunk}.chunk_state(Ordering::Acquire) diff --git a/src/mpmc/event_queue.rs b/src/mpmc/event_queue.rs index 8da0a8a..7b11439 100644 --- a/src/mpmc/event_queue.rs +++ b/src/mpmc/event_queue.rs @@ -53,7 +53,7 @@ impl EventQueue{ /// "Lazily move" all readers positions to the "end of the queue". From readers perspective, /// equivalent to conventional `clear`. /// - /// Immediately free all **unoccupied** chunks. + /// Immediately free all chunks, **unoccupied** by readers. /// /// "End of the queue" - is the queue's end position at the moment of the `clear` call. /// @@ -68,7 +68,7 @@ impl EventQueue{ /// "Lazily move" all readers positions to the `len`-th element from the end of the queue. /// From readers perspective, equivalent to conventional `truncate` from the other side. /// - /// Immediately free **unoccupied** chunks. + /// Immediately free chunks, **unoccupied** by readers. /// /// "Lazy move" - means that reader actually change position and free occupied chunk, /// only when actual read starts. diff --git a/src/mpmc/event_reader.rs b/src/mpmc/event_reader.rs index da3b51a..6a2cca5 100644 --- a/src/mpmc/event_reader.rs +++ b/src/mpmc/event_reader.rs @@ -28,7 +28,7 @@ impl EventReader{ /// This is consuming iterator. Return references. /// Iterator items references should not outlive iterator. /// - /// Read counters of affected chunks updated in [Iter::drop]. + /// Read counters of affected chunks updated in `Iter::drop`. #[inline] pub fn iter(&mut self) -> Iter{ Iter{ 0: self.0.iter() } diff --git a/src/spmc/event_queue.rs b/src/spmc/event_queue.rs index 1683d8f..c20bb00 100644 --- a/src/spmc/event_queue.rs +++ b/src/spmc/event_queue.rs @@ -6,7 +6,7 @@ use crate::CleanupMode; /// See [mpmc](crate::mpmc::EventQueue) documentation. /// -/// Only [cleanup] and `unsubscribe`(on [EventReader::drop]) are synchronized. +/// Only [cleanup](EventQueue::cleanup) and `unsubscribe`(on `EventReader::drop`) are synchronized. /// Everything else - overhead free. /// /// Insert performance in the `std::vec::Vec` league. diff --git a/src/tests/mpmc.rs b/src/tests/mpmc.rs index 4942a5c..86f71d5 100644 --- a/src/tests/mpmc.rs +++ b/src/tests/mpmc.rs @@ -178,7 +178,74 @@ fn clear_test() { } #[test] -#[cfg(not(miri))] +#[cfg(any(not(miri), not(target_os = "windows")))] +fn mt_push_truncate_test() { +for _ in 0..if cfg!(miri){1} else {100}{ + struct S{} impl Settings for S{ + const MAX_CHUNK_SIZE: u32 = 256; + } + + let event = EventQueue::::new(); + + let mut readers = Vec::new(); + for _ in 0..2{ + readers.push(EventReader::new(&event)); + } + + let writer_thread = { + let event = event.clone(); + Box::new(thread::spawn(move || { + for i in 0..10000{ + event.push(i); + } + })) + }; + + let stop_clear_flag = Arc::new(AtomicBool::new(false)); + let clear_thread = { + let event = event.clone(); + let stop_clear_flag = stop_clear_flag.clone(); + Box::new(thread::spawn(move || { + let mut i = 0; + loop { + let stop = stop_clear_flag.load(Ordering::Acquire); + if stop{ + break; + } + + if i == 1000{ + event.truncate_front(100); + i = 0; + } + i += 1; + std::hint::spin_loop(); + } + })) + }; + + // read + let mut threads = Vec::new(); + for mut reader in readers{ + let thread = Box::new(thread::spawn(move || { + // some work here + let _local_sum: usize = consume_copies(&mut reader.iter()).iter().sum(); + })); + threads.push(thread); + } + + writer_thread.join().unwrap(); + + stop_clear_flag.store(true, Ordering::Release); + clear_thread.join().unwrap(); + + for thread in threads{ + thread.join().unwrap(); + } +} +} + +#[test] +#[cfg(any(not(miri), not(target_os = "windows")))] fn mt_read_test() { for _ in 0..10{ struct S{} impl Settings for S{ @@ -186,15 +253,15 @@ fn mt_read_test() { const MAX_CHUNK_SIZE: u32 = 512; const CLEANUP: CleanupMode = DefaultSettings::CLEANUP; } - mt_read_test_impl::(4, 1000000); + mt_read_test_impl::(4, if cfg!(miri){ 1000 } else { 1000000 }); } } #[test] -#[cfg(not(miri))] +#[cfg(any(not(miri), not(target_os = "windows")))] fn mt_write_read_test() { -for _ in 0..100{ - let writer_chunk = 10000; +for _ in 0..if cfg!(miri){10} else {100} { + let writer_chunk = if cfg!(miri){ 1000 } else { 10000 }; let writers_thread_count = 2; let readers_thread_count = 4; struct S{} impl Settings for S{ diff --git a/src/tests/spmc.rs b/src/tests/spmc.rs index fb0ea3b..1976852 100644 --- a/src/tests/spmc.rs +++ b/src/tests/spmc.rs @@ -17,10 +17,10 @@ fn basic_test(){ } #[test] -#[cfg(not(miri))] +#[cfg(any(not(miri), not(target_os = "windows")))] fn mt_write_read_test() { -for _ in 0..100{ - let queue_size = 10000; +for _ in 0..if cfg!(miri){10} else {100} { + let queue_size = if cfg!(miri){ 1000 } else { 10000 }; let readers_thread_count = 4; struct S{} impl Settings for S{ const MIN_CHUNK_SIZE: u32 = 32;