Skip to content

Commit

Permalink
truncate_front added. extend reworked.
Browse files Browse the repository at this point in the history
  • Loading branch information
tower120 committed Sep 15, 2021
1 parent 3c7d8fe commit a5fc450
Show file tree
Hide file tree
Showing 5 changed files with 160 additions and 25 deletions.
10 changes: 10 additions & 0 deletions src/chunk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,16 @@ impl<T, const CHUNK_SIZE: usize> ChunkStorage<T, CHUNK_SIZE> {
return Result::Ok(());
}

#[inline(always)]
pub unsafe fn push_unchecked(&mut self, value: T, store_ordering: Ordering){
// Relaxed because updated only with &mut self
let len_and_epoch: LenAndEpoch = self.len_and_start_position_epoch.load(Ordering::Relaxed).into();
let index = len_and_epoch.len();
let epoch = len_and_epoch.epoch();

self.push_at(value, index, epoch, store_ordering);
}

#[inline(always)]
pub(super) unsafe fn push_at(&mut self, value: T, index: u32, epoch: u32, store_ordering: Ordering) {
debug_assert!((index as usize) < CHUNK_SIZE);
Expand Down
109 changes: 88 additions & 21 deletions src/event_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ pub struct EventQueue<T, const CHUNK_SIZE: usize, const AUTO_CLEANUP: bool>{
/// chunk's atomic len+epoch.
pub(super) start_position: SpinMutex<Cursor<T, CHUNK_SIZE, AUTO_CLEANUP>>,

pinned: PhantomPinned,
_pinned: PhantomPinned,
}

unsafe impl<T, const CHUNK_SIZE : usize, const AUTO_CLEANUP: bool> Send for EventQueue<T, CHUNK_SIZE, AUTO_CLEANUP>{}
Expand All @@ -103,7 +103,7 @@ impl<T, const CHUNK_SIZE : usize, const AUTO_CLEANUP: bool> EventQueue<T, CHUNK_
list : Mutex::new(List{first: node_ptr, last: node_ptr, chunk_id_counter: 0}),
readers : AtomicUsize::new(0),
start_position: SpinMutex::new(Cursor{chunk: node_ptr, index:0}),
pinned: PhantomPinned,
_pinned: PhantomPinned,
});
unsafe {(*node_ptr).event = &*this};
this
Expand All @@ -126,7 +126,7 @@ impl<T, const CHUNK_SIZE : usize, const AUTO_CLEANUP: bool> EventQueue<T, CHUNK_
unsafe{&mut *new_node_ptr}
}

// Leave this for a while. Have filling that this one should be faster.
// Leave this for a while. Have feeling that this one should be faster.
// #[inline]
// pub fn push(&self, value: T){
// let mut list = self.list.lock();
Expand All @@ -151,13 +151,14 @@ impl<T, const CHUNK_SIZE : usize, const AUTO_CLEANUP: bool> EventQueue<T, CHUNK_
let node = unsafe{&mut *list.last};

if let Err(err) = node.storage.try_push(value, Ordering::Release){
let res = self.add_chunk(&mut *list)
.storage.try_push(err.value, Ordering::Release);
debug_assert!(res.is_ok());
unsafe {
self.add_chunk(&mut *list)
.storage.push_unchecked(err.value, Ordering::Release);
}
}
}

// Not a Extend trait, because Extend::extend(&mut self)
// Not an Extend trait, because Extend::extend(&mut self)
#[inline]
pub fn extend<I>(&self, iter: I)
where I: IntoIterator<Item = T>
Expand All @@ -168,7 +169,14 @@ impl<T, const CHUNK_SIZE : usize, const AUTO_CLEANUP: bool> EventQueue<T, CHUNK_
let mut iter = iter.into_iter();

while node.storage.extend(&mut iter, Ordering::Release).is_err(){
node = self.add_chunk(&mut *list);
match iter.next() {
None => {return;}
Some(value) => {
// add chunk and push value there
node = self.add_chunk(&mut *list);
unsafe{ node.storage.push_unchecked(value, Ordering::Relaxed); }
}
};
}
}

Expand Down Expand Up @@ -231,11 +239,7 @@ impl<T, const CHUNK_SIZE : usize, const AUTO_CLEANUP: bool> EventQueue<T, CHUNK_
}
}

/// Free all completely read chunks.
/// Called automatically with AUTO_CLEANUP = true.
pub fn cleanup(&self){
let mut list = self.list.lock();

fn cleanup_impl(&self, mut list: MutexGuard<List<T, CHUNK_SIZE, AUTO_CLEANUP>>){
let readers_count = self.readers.load(Ordering::Relaxed);
unsafe {
foreach_chunk(
Expand All @@ -259,33 +263,96 @@ impl<T, const CHUNK_SIZE : usize, const AUTO_CLEANUP: bool> EventQueue<T, CHUNK_
}
}

/// Free all completely read chunks.
/// Called automatically with AUTO_CLEANUP = true.
pub fn cleanup(&self){
self.cleanup_impl(self.list.lock());
}

#[inline]
fn set_start_position(
&self,
list: MutexGuard<List<T, CHUNK_SIZE, AUTO_CLEANUP>>,
new_start_position: Cursor<T, CHUNK_SIZE, AUTO_CLEANUP>)
{
*self.start_position.lock() = new_start_position;

// update len_and_start_position_epoch in each chunk
let new_epoch = unsafe{ (*list.first).storage.len_and_epoch(Ordering::Relaxed).epoch() } + 1;
unsafe {
foreach_chunk_mut(
list.first,
null(),
|chunk| {
chunk.storage.set_epoch(new_epoch, Ordering::Relaxed, Ordering::Release);
Continue(())
}
);
}

if AUTO_CLEANUP {
if self.readers.load(Ordering::Relaxed) == 0{
self.cleanup_impl(list);
}
}
}

pub fn clear(&self){
let list = self.list.lock();

let last_chunk = unsafe{ &*list.last };
let len_and_epoch = last_chunk.storage.len_and_epoch(Ordering::Relaxed);
*self.start_position.lock() = Cursor {

self.set_start_position(list, Cursor {
chunk: last_chunk,
index: len_and_epoch.len() as usize
};
});
}

// update len_and_start_position_epoch in each chunk
let new_epoch = len_and_epoch.epoch() + 1;
/// Shortens the `EventQueue`, keeping the last `chunks_count` chunks and dropping the first ones.
/// At least one chunk always remains.
/// Returns number of freed chunks
pub fn truncate_front(&self, chunks_count: usize) -> usize {
let list = self.list.lock();

let chunk_id = list.chunk_id_counter as isize - chunks_count as isize + 1;
if chunk_id <= 0{
return 0;
}

let freed_chunks = chunk_id as usize - unsafe{(*list.first).id};

let mut start_chunk = null();
unsafe {
foreach_chunk_mut(
foreach_chunk(
list.first,
null(),
|chunk| {
chunk.storage.set_epoch(new_epoch, Ordering::Relaxed, Ordering::Release);
if chunk.id == chunk_id as usize{
start_chunk = chunk;
return Break(());
}
Continue(())
}
);
}

self.set_start_position(list, Cursor {
chunk: start_chunk,
index: 0
});

return freed_chunks;
}

// chunks_count can be atomic. But does that needed?
pub fn chunks_count(&self) -> usize {
let list = self.list.lock();
unsafe{
list.chunk_id_counter/*(*list.last).id*/ - (*list.first).id + 1
}
}

// TODO: len in chunks
// TODO: truncate
// TODO: reuse chunks (double/triple buffering)
// TODO: try non-fixed chunks
}
Expand Down
5 changes: 3 additions & 2 deletions src/event_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,8 +173,9 @@ impl<'a, T, const CHUNK_SIZE: usize, const AUTO_CLEANUP: bool> Iterator for Iter

self.chunk_len = chunk.storage.len_and_epoch(Ordering::Acquire).len() as usize;

// Maybe 0 when new chunk is created, but item still not pushed.
// It is faster to have rare additional check here, then in `push`
// Maybe 0, when new chunk is created, but item still not pushed.
// It is possible rework `push`/`extend` in the way that this situation will not exists.
// But for now, just have this check here.
if self.chunk_len == 0 {
return None;
}
Expand Down
4 changes: 2 additions & 2 deletions src/tests/loom_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ fn loom_mt_write_read_test(){
// to consume leftovers. Since iter's end/sentinel acquired at iter construction.

let (lock, cvar) = &*readers_stop;
let mut stopped = lock.lock().unwrap();
let mut stopped = lock.lock();

loop {
for [i0, i1, i2, i3] in reader.iter(){
Expand Down Expand Up @@ -96,7 +96,7 @@ fn loom_mt_write_read_test(){

{
let (lock, cvar) = &*readers_stop;
let mut stopped = lock.lock().unwrap();
let mut stopped = lock.lock();
*stopped = true;
cvar.notify_all();
}
Expand Down
57 changes: 57 additions & 0 deletions src/tests/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,63 @@ fn clean_test() {
);
}

#[test]
fn truncate_front_test1() {
let event = EventQueue::<usize, 4, true>::new();

event.extend(0..3);

let truncated_chunks = event.truncate_front(1);
assert_eq!(truncated_chunks, 0);
assert_eq!(event.chunks_count(), 1);

event.extend(3..6);
assert_eq!(event.chunks_count(), 2);

let truncated_chunks = event.truncate_front(1);
assert_eq!(truncated_chunks, 1);
assert_eq!(event.chunks_count(), 1);
}

#[test]
fn truncate_front_test2() {
let event = EventQueue::<usize, 4, true>::new();
let mut reader = event.subscribe();

event.extend(0..40);
assert_eq!(event.chunks_count(), 10);

let truncated_chunks = event.truncate_front(2);
assert_eq!(truncated_chunks, 8);

assert_eq!(event.chunks_count(), 10);
reader.update_position();
assert_eq!(event.chunks_count(), 2);

assert_equal(reader.iter().copied(), 32..40);
assert_eq!(event.chunks_count(), 1);
}

#[test]
fn chunks_count_test() {
let event = EventQueue::<usize, 4, true>::new();

assert_eq!(event.chunks_count(), 1);
event.push(0);
event.push(1);
event.push(2);
event.push(3);
assert_eq!(event.chunks_count(), 1);

event.push(4);
assert_eq!(event.chunks_count(), 2);
event.push(5);
assert_eq!(event.chunks_count(), 2);

event.clear();
assert_eq!(event.chunks_count(), 1);
}

#[test]
fn mt_read_test() {
for _ in 0..10{
Expand Down

0 comments on commit a5fc450

Please sign in to comment.