From 34dd9d5de02fcfb8820d3605f26abb2c8e9e0753 Mon Sep 17 00:00:00 2001 From: glorv Date: Tue, 30 Aug 2022 14:25:59 +0800 Subject: [PATCH] limit batch steal size --- crossbeam-deque/src/deque.rs | 172 +++++++++++++++++++++++++++++++++-- 1 file changed, 164 insertions(+), 8 deletions(-) diff --git a/crossbeam-deque/src/deque.rs b/crossbeam-deque/src/deque.rs index bda3bf820..8afe15f4b 100644 --- a/crossbeam-deque/src/deque.rs +++ b/crossbeam-deque/src/deque.rs @@ -693,6 +693,45 @@ impl Stealer { /// assert_eq!(w2.pop(), Some(2)); /// ``` pub fn steal_batch(&self, dest: &Worker) -> Steal<()> { + self.steal_batch_with_limit(dest, MAX_BATCH) + } + + /// Steals no more than `limit` of tasks and pushes them into another worker. + /// + /// How many tasks exactly will be stolen is not specified. That said, this method will try to + /// steal around half of the tasks in the queue, but also not more than the given limit. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_deque::Worker; + /// + /// let w1 = Worker::new_fifo(); + /// w1.push(1); + /// w1.push(2); + /// w1.push(3); + /// w1.push(4); + /// w1.push(5); + /// w1.push(6); + /// + /// let s = w1.stealer(); + /// let w2 = Worker::new_fifo(); + /// + /// let _ = s.steal_batch_with_limit(&w2, 2); + /// assert_eq!(w2.pop(), Some(1)); + /// assert_eq!(w2.pop(), Some(2)); + /// assert_eq!(w2.pop(), None); + /// + /// w1.push(7); + /// w1.push(8); + /// // Setting a large limit does not guarantee that all elements will be popped. In this case, + /// // half of the elements are currently popped, but the number of popped elements is considered + /// // an implementation detail that may be changed in the future. + /// let _ = s.steal_batch_with_limit(&w2, std::usize::MAX); + /// assert_eq!(w2.len(), 3); + /// ``` + pub fn steal_batch_with_limit(&self, dest: &Worker, limit: usize) -> Steal<()> { + assert!(limit > 0); if Arc::ptr_eq(&self.inner, &dest.inner) { if dest.is_empty() { return Steal::Empty; @@ -725,7 +764,7 @@ impl Stealer { } // Reserve capacity for the stolen batch. - let batch_size = cmp::min((len as usize + 1) / 2, MAX_BATCH); + let batch_size = cmp::min((len as usize + 1) / 2, limit); dest.reserve(batch_size); let mut batch_size = batch_size as isize; @@ -891,6 +930,47 @@ impl Stealer { /// assert_eq!(w2.pop(), Some(2)); /// ``` pub fn steal_batch_and_pop(&self, dest: &Worker) -> Steal { + self.steal_batch_with_limit_and_pop(dest, MAX_BATCH) + } + + /// Steals no more than `limit` of tasks, pushes them into another worker, and pops a task from + /// that worker. + /// + /// How many tasks exactly will be stolen is not specified. That said, this method will try to + /// steal around half of the tasks in the queue, but also not more than the given limit. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_deque::{Steal, Worker}; + /// + /// let w1 = Worker::new_fifo(); + /// w1.push(1); + /// w1.push(2); + /// w1.push(3); + /// w1.push(4); + /// w1.push(5); + /// w1.push(6); + /// + /// let s = w1.stealer(); + /// let w2 = Worker::new_fifo(); + /// + /// assert_eq!(s.steal_batch_with_limit_and_pop(&w2, 2), Steal::Success(1)); + /// assert_eq!(w2.pop(), Some(2)); + /// assert_eq!(w2.pop(), None); + /// + /// w1.push(7); + /// w1.push(8); + /// // Setting a large limit does not guarantee that all elements will be popped. In this case, + /// // half of the elements are currently popped, but the number of popped elements is considered + /// // an implementation detail that may be changed in the future. + /// assert_eq!(s.steal_batch_with_limit_and_pop(&w2, std::usize::MAX), Steal::Success(3)); + /// assert_eq!(w2.pop(), Some(4)); + /// assert_eq!(w2.pop(), Some(5)); + /// assert_eq!(w2.pop(), None); + /// ``` + pub fn steal_batch_with_limit_and_pop(&self, dest: &Worker, limit: usize) -> Steal { + assert!(limit > 0); if Arc::ptr_eq(&self.inner, &dest.inner) { match dest.pop() { None => return Steal::Empty, @@ -922,7 +1002,7 @@ impl Stealer { } // Reserve capacity for the stolen batch. - let batch_size = cmp::min((len as usize - 1) / 2, MAX_BATCH - 1); + let batch_size = cmp::min((len as usize - 1) / 2, limit - 1); dest.reserve(batch_size); let mut batch_size = batch_size as isize; @@ -1444,6 +1524,43 @@ impl Injector { /// assert_eq!(w.pop(), Some(2)); /// ``` pub fn steal_batch(&self, dest: &Worker) -> Steal<()> { + self.steal_batch_with_limit(dest, MAX_BATCH) + } + + /// Steals no more than of tasks and pushes them into a worker. + /// + /// How many tasks exactly will be stolen is not specified. That said, this method will try to + /// steal around half of the tasks in the queue, but also not more than some constant limit. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_deque::{Injector, Worker}; + /// + /// let q = Injector::new(); + /// q.push(1); + /// q.push(2); + /// q.push(3); + /// q.push(4); + /// q.push(5); + /// q.push(6); + /// + /// let w = Worker::new_fifo(); + /// let _ = q.steal_batch_with_limit(&w, 2); + /// assert_eq!(w.pop(), Some(1)); + /// assert_eq!(w.pop(), Some(2)); + /// assert_eq!(w.pop(), None); + /// + /// q.push(7); + /// q.push(8); + /// // Setting a large limit does not guarantee that all elements will be popped. In this case, + /// // half of the elements are currently popped, but the number of popped elements is considered + /// // an implementation detail that may be changed in the future. + /// let _ = q.steal_batch_with_limit(&w, std::usize::MAX); + /// assert_eq!(w.len(), 3); + /// ``` + pub fn steal_batch_with_limit(&self, dest: &Worker, limit: usize) -> Steal<()> { + assert!(limit > 0); let mut head; let mut block; let mut offset; @@ -1481,15 +1598,15 @@ impl Injector { if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP { new_head |= HAS_NEXT; // We can steal all tasks till the end of the block. - advance = (BLOCK_CAP - offset).min(MAX_BATCH); + advance = (BLOCK_CAP - offset).min(limit); } else { let len = (tail - head) >> SHIFT; // Steal half of the available tasks. - advance = ((len + 1) / 2).min(MAX_BATCH); + advance = ((len + 1) / 2).min(limit); } } else { // We can steal all tasks till the end of the block. - advance = (BLOCK_CAP - offset).min(MAX_BATCH); + advance = (BLOCK_CAP - offset).min(limit); } new_head += advance << SHIFT; @@ -1603,6 +1720,45 @@ impl Injector { /// assert_eq!(w.pop(), Some(2)); /// ``` pub fn steal_batch_and_pop(&self, dest: &Worker) -> Steal { + // TODO: we use `MAX_BATCH + 1` as the hard limit for Injecter as the performance is slightly + // better, but we may change it in the future to be compatible with the same method in Stealer. + self.steal_batch_with_limit_and_pop(dest, MAX_BATCH + 1) + } + + /// Steals no more than `limit` of tasks, pushes them into a worker, and pops a task from that worker. + /// + /// How many tasks exactly will be stolen is not specified. That said, this method will try to + /// steal around half of the tasks in the queue, but also not more than the given limit. + /// + /// # Examples + /// + /// ``` + /// use crossbeam_deque::{Injector, Steal, Worker}; + /// + /// let q = Injector::new(); + /// q.push(1); + /// q.push(2); + /// q.push(3); + /// q.push(4); + /// q.push(5); + /// q.push(6); + /// + /// let w = Worker::new_fifo(); + /// assert_eq!(q.steal_batch_with_limit_and_pop(&w, 2), Steal::Success(1)); + /// assert_eq!(w.pop(), Some(2)); + /// assert_eq!(w.pop(), None); + /// + /// q.push(7); + /// // Setting a large limit does not guarantee that all elements will be popped. In this case, + /// // half of the elements are currently popped, but the number of popped elements is considered + /// // an implementation detail that may be changed in the future. + /// assert_eq!(q.steal_batch_with_limit_and_pop(&w, std::usize::MAX), Steal::Success(3)); + /// assert_eq!(w.pop(), Some(4)); + /// assert_eq!(w.pop(), Some(5)); + /// assert_eq!(w.pop(), None); + /// ``` + pub fn steal_batch_with_limit_and_pop(&self, dest: &Worker, limit: usize) -> Steal { + assert!(limit > 0); let mut head; let mut block; let mut offset; @@ -1639,15 +1795,15 @@ impl Injector { if (head >> SHIFT) / LAP != (tail >> SHIFT) / LAP { new_head |= HAS_NEXT; // We can steal all tasks till the end of the block. - advance = (BLOCK_CAP - offset).min(MAX_BATCH + 1); + advance = (BLOCK_CAP - offset).min(limit); } else { let len = (tail - head) >> SHIFT; // Steal half of the available tasks. - advance = ((len + 1) / 2).min(MAX_BATCH + 1); + advance = ((len + 1) / 2).min(limit); } } else { // We can steal all tasks till the end of the block. - advance = (BLOCK_CAP - offset).min(MAX_BATCH + 1); + advance = (BLOCK_CAP - offset).min(limit); } new_head += advance << SHIFT;