Skip to content

Commit

Permalink
start_position fix for out-of-order cleanup
Browse files Browse the repository at this point in the history
* miri spmc tests
* start_position fix for out-of-order cleanup
* total_capacity now O(1)
* truncate_front logical fix
* changelog update
* on_new_chunk_cleanup lock fix
  • Loading branch information
tower120 authored Oct 20, 2021
1 parent 0648b01 commit aa55039
Show file tree
Hide file tree
Showing 12 changed files with 241 additions and 113 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ jobs:
steps:
- uses: actions/checkout@v2
- name: Build doc
run: cargo doc --lib
run: RUSTFLAGS="--deny warnings" cargo doc --lib

loom:
runs-on: ubuntu-latest
Expand Down
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
- clear/truncate_front now dispose chunks not occupied by readers immediately! Which, at least partially, solves "emergency cleanup" problem.
Now you don't have to have access to all readers!
- Subscribe/unsubscribe now O(1).
- EventQueue::total_capacity now O(1).

## 0.4.1
### Added
Expand Down
16 changes: 10 additions & 6 deletions Readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,11 @@ Read - per thread performance degrades slowly, with each additional simultaneous
_(Also remember, since `rc_event_queue` is message queue, and each reader read ALL queue -
adding more readers does not consume queue faster)_

Write - per thread performance degrades close to linearly, with each additional simultaneously writing thread.
Write - per thread performance degrades almost linearly, with each additional simultaneously writing thread.
(Due to being locked). Not applicable to `spmc`.

N.B. But if there is no heavy contention - performance very close to single-threaded case.

[See mpmc benchmarks](doc/mpmc_benchmarks.md).

### Principle of operation
Expand Down Expand Up @@ -93,7 +95,7 @@ assert!(sum(reader2.iter()) == 1100);
### Emergency cut

If any of the readers did not read for a long time - it can retain queue from cleanup.
This means that queue capacity will grow. On long run systems, you may want to periodically check `total_capacity`,
This means that queue capacity will grow. On long runs with unpredictable systems, you may want to periodically check `total_capacity`,
and if it grows too much - you may want to force-cut/clear it.

```rust
Expand All @@ -104,18 +106,20 @@ if event.total_capacity() > 100000{
event.truncate_front(1000); // leave some of the latest messages to read

// If you set to Settings::MAX_CHUNK_SIZE to high value,
// This will reduce chunk size on next writes.
// this will reduce chunk size.
event.change_chunk_size(2048);

// If you do have access to all readers - this will move readers forward,
// and free the rest of the chunks.
// If you DO have access to all readers (you probably don't) -
// this will move readers forward, and free the chunks occupied by readers.
// Under normal conditions, this is not necessary, since readers will jump
// forward to another chunk immediately on the next iter() call.
for reader in readers{
reader.update_position();
// reader.iter(); // this have same effect as above
}
}

```
Even if some reader will stop read forever - you'll only lose/leak chunk directly occupied by reader.

### Optimisation

Expand Down
6 changes: 4 additions & 2 deletions doc/principle-of-operation.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ Writes happens under lock, and does not block reads.

Single-threaded read performance is between `Vec` and `VecDeque` for best-case scenario; 1.5-2x slower then `VecDeque` - for worse.

Single-threaded write performance 4-5x slower then `VecDeque`. But writing with `EventQueue::extend` can give you `VecDeque`-like performance.
Single-threaded write performance 2-3x slower then `VecDeque`. But writing with `EventQueue::extend` can give you `VecDeque`-like performance.

Memory-wise there is only fixed overhead. Each reader is just kind a pointer.

Expand Down Expand Up @@ -128,7 +128,7 @@ If we have only one chunk left, and previous chunk had the same size - we found

With `feature="double_buffering"` enabled, the biggest freed chunk will be stored for further reuse.

## Tracking readers. Out-of-order chunks dispose.
## Tracking readers. Out-of-order chunks disposal.

![](images/tracked_chunks.png)

Expand All @@ -153,6 +153,8 @@ On the EventReader side, during chunk switch:
- Out+=1, Next.In+=1
- Release mutex

Queue remains lockless up to the clear/truncate call, due to read lock.

## Optimisation techniques

_TODO: AUTO_CLEANUP=false_
Expand Down
130 changes: 74 additions & 56 deletions src/event_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use crate::dynamic_chunk::{DynamicChunkRecycled};
use crate::{StartPositionEpoch};

/// This way you can control when chunk's memory deallocation happens.
/// _In addition, some operations may cause deallocations as well._
#[derive(PartialEq)]
pub enum CleanupMode{
/// Cleanup will be called when chunk fully read.
Expand All @@ -29,7 +30,7 @@ pub enum CleanupMode{
OnChunkRead,
/// Cleanup will be called when new chunk created.
OnNewChunk,
/// Cleanup will never be called. You should call [EventQueue::cleanup] manually.
/// Cleanup will never be called. You should call `EventQueue::cleanup` manually.
Never
}

Expand All @@ -49,8 +50,9 @@ pub struct List<T, S: Settings>{
first: *mut DynamicChunk<T, S>,
last : *mut DynamicChunk<T, S>,
chunk_id_counter: usize,
total_capacity: usize,

readers_count: usize,
readers_count: u32,

/// 0 - means no penult
penult_chunk_size: u32,
Expand All @@ -65,7 +67,9 @@ pub struct EventQueue<T, S: Settings>{

/// Separate lock from list::start_position_epoch, is safe, because start_point_epoch encoded in
/// chunk's atomic len+epoch.
pub(crate) start_position: SpinMutex<Cursor<T, S>>,
// TODO: Make RWLock? Bench.
// TODO: Optioned
pub(crate) start_position: SpinMutex<Option<Cursor<T, S>>>,

_pinned: PhantomPinned,
}
Expand All @@ -84,12 +88,13 @@ impl<T, S: Settings> EventQueue<T, S>
last: null_mut(),
chunk_id_counter: 0,
readers_count:0,
total_capacity:new_capacity as usize,
penult_chunk_size : 0,

#[cfg(feature = "double_buffering")]
free_chunk: None,
}),
start_position: SpinMutex::new(Cursor{chunk: null(), index:0}),
start_position: SpinMutex::new(None),
_pinned: PhantomPinned,
});

Expand All @@ -100,7 +105,6 @@ impl<T, S: Settings> EventQueue<T, S>
let event = &mut *(&*this as *const _ as *mut EventQueue<T, S>);
event.list.get_mut().first = node;
event.list.get_mut().last = node;
event.start_position.get_mut().chunk = node;
}

this
Expand Down Expand Up @@ -150,6 +154,7 @@ impl<T, S: Settings> EventQueue<T, S>
node.set_next(new_node, Ordering::Release);
list.last = new_node;
list.penult_chunk_size = node.capacity() as u32;
list.total_capacity += size;

unsafe{&mut *new_node}
}
Expand All @@ -159,8 +164,8 @@ impl<T, S: Settings> EventQueue<T, S>
if S::CLEANUP == CleanupMode::OnNewChunk{
// this should acts as compile-time-if.
if S::LOCK_ON_NEW_CHUNK_CLEANUP{
// `cleanup` - locks internally
self.cleanup();
let _lock = self.list.lock();
self.cleanup_impl(list);
} else {
self.cleanup_impl(list);
}
Expand Down Expand Up @@ -283,7 +288,23 @@ impl<T, S: Settings> EventQueue<T, S>
}
}

unsafe fn free_chunk(chunk: *mut DynamicChunk<T, S>, list: &mut List<T, S>){
unsafe fn free_chunk<const LOCK_ON_WRITE_START_POSITION: bool>(
&self,
chunk: *mut DynamicChunk<T, S>,
list: &mut List<T, S>)
{
if let Some(start_position) = *self.start_position.as_mut_ptr(){
if start_position.chunk == chunk{
if LOCK_ON_WRITE_START_POSITION{
*self.start_position.lock() = None;
} else {
*self.start_position.as_mut_ptr() = None;
}
}
}

list.total_capacity -= (*chunk).capacity();

#[cfg(not(feature = "double_buffering"))]
{
DynamicChunk::destruct(chunk);
Expand Down Expand Up @@ -327,7 +348,9 @@ impl<T, S: Settings> EventQueue<T, S>
debug_assert!(!next_chunk_ptr.is_null());

debug_assert!(std::ptr::eq(chunk, list.first));
Self::free_chunk(chunk, list);
// Do not lock start_position permanently, because reader will
// never enter chunk before list.first
self.free_chunk::<true>(chunk, list);
list.first = next_chunk_ptr;

Continue(())
Expand All @@ -339,19 +362,22 @@ impl<T, S: Settings> EventQueue<T, S>
}
}

/// This will traverse up to the start_point. And will do out-of-order cleanup.
/// This will traverse up to the start_point - and will free all unoccupied chunks. (out-of-order cleanup)
/// This one slower then cleanup_impl.
fn force_cleanup_impl(&self, list: &mut List<T, S>){
self.cleanup_impl(list);
unsafe {
let terminal_chunk = (*self.start_position.as_mut_ptr()).chunk;
if terminal_chunk.is_null(){
return;
}
if (*list.first).id() >= (*terminal_chunk).id(){
return;
}

// Lock start_position permanently, due to out of order chunk destruction.
// Reader can try enter in the chunk in the middle of force_cleanup execution.
let start_position = self.start_position.lock();
let terminal_chunk = match &*start_position{
None => { return; }
Some(cursor) => {cursor.chunk}
};
if list.first as *const _ == terminal_chunk{
return;
}
unsafe {
// cleanup_impl dealt with first chunk before. Omit.
let mut prev_chunk = list.first;
// using _ptr version, because with &chunk - reference should be valid during whole
Expand All @@ -378,7 +404,7 @@ impl<T, S: Settings> EventQueue<T, S>
(*prev_chunk).set_next(next_chunk_ptr, Ordering::Release);
drop(lock);

Self::free_chunk(chunk, list);
self.free_chunk::<false>(chunk, list);
Continue(())
}
);
Expand All @@ -395,7 +421,7 @@ impl<T, S: Settings> EventQueue<T, S>
list: &mut List<T, S>,
new_start_position: Cursor<T, S>)
{
*self.start_position.lock() = new_start_position;
*self.start_position.lock() = Some(new_start_position);

// update len_and_start_position_epoch in each chunk
let first_chunk = unsafe{&mut *list.first};
Expand All @@ -415,52 +441,58 @@ impl<T, S: Settings> EventQueue<T, S>

pub fn clear(&self, list: &mut List<T, S>){
let last_chunk = unsafe{ &*list.last };
let chunk_len = last_chunk.chunk_state(Ordering::Relaxed).len() as usize;
let last_chunk_len = last_chunk.chunk_state(Ordering::Relaxed).len() as usize;

self.set_start_position(list, Cursor {
chunk: last_chunk,
index: chunk_len
index: last_chunk_len
});

self.force_cleanup_impl(list);
}

pub fn truncate_front(&self, list: &mut List<T, S>, len: usize) {
// make chunks* array
let chunks_count= unsafe {
list.chunk_id_counter/*(*list.last).id*/ - (*list.first).id() + 1
};

// TODO: subtract from total_capacity
// TODO: use small_vec
// TODO: loop if > 128?
// there is no way we can have memory enough to hold > 2^64 bytes.
debug_assert!(chunks_count<=128);
let mut chunks : [*const DynamicChunk<T, S>; 128] = [null(); 128];
unsafe {
let mut i = 0;
foreach_chunk(
list.first,
null(),
Ordering::Relaxed, // we're under mutex
|chunk| {
chunks[i] = chunk;
i+=1;
Continue(())
}
);
}
let chunks_count=
unsafe {
let mut i = 0;
foreach_chunk(
list.first,
null(),
Ordering::Relaxed, // we're under mutex
|chunk| {
chunks[i] = chunk;
i+=1;
Continue(())
}
);
i
};

let mut total_len = 0;
for i in (0..chunks_count).rev(){
let chunk = unsafe{ &*chunks[i] };
let chunk_len = chunk.chunk_state(Ordering::Relaxed).len() as usize;
total_len += chunk_len;
if total_len >= len{
self.set_start_position(list, Cursor {
let new_start_position = Cursor {
chunk: chunks[i],
index: total_len - len
});
};
// Do we actually need to truncate?
if let Some(start_position) = unsafe{*self.start_position.as_mut_ptr()}{
if start_position >= new_start_position{
return;
}
}

self.set_start_position(list, new_start_position);
self.force_cleanup_impl(list);
return;
}
Expand All @@ -476,22 +508,8 @@ impl<T, S: Settings> EventQueue<T, S>
self.add_chunk_sized(&mut *list, new_capacity as usize);
}

/// O(n)
/// TODO: store current capacity
pub fn total_capacity(&self, list: &List<T, S>) -> usize {
let mut total = 0;
unsafe {
foreach_chunk(
list.first,
null(),
Ordering::Relaxed, // we're under mutex
|chunk| {
total += chunk.capacity();
Continue(())
}
);
}
total
list.total_capacity
}

pub fn chunk_capacity(&self, list: &List<T, S>) -> usize {
Expand Down
Loading

0 comments on commit aa55039

Please sign in to comment.