From 7088299fee0c1c6959f28be1a56d424485c5b5a8 Mon Sep 17 00:00:00 2001 From: Brian Malehorn Date: Sat, 15 Apr 2017 22:02:39 -0700 Subject: [PATCH] walk.rs: queue -> stack, sort readdir --- ignore/src/walk.rs | 41 ++++++++++++++++++++++++++++++----------- 1 file changed, 30 insertions(+), 11 deletions(-) diff --git a/ignore/src/walk.rs b/ignore/src/walk.rs index a5989d145..43455b405 100644 --- a/ignore/src/walk.rs +++ b/ignore/src/walk.rs @@ -10,7 +10,7 @@ use std::thread; use std::time::Duration; use std::vec; -use crossbeam::sync::MsQueue; +use crossbeam::sync::TreiberStack; use walkdir::{self, WalkDir, WalkDirIterator, is_same_file}; use dir::{Ignore, IgnoreBuilder}; @@ -864,7 +864,7 @@ impl WalkParallel { ) where F: FnMut() -> Box) -> WalkState + Send + 'static> { let mut f = mkf(); let threads = self.threads(); - let queue = Arc::new(MsQueue::new()); + let stack = Arc::new(TreiberStack::new()); let mut any_work = false; // Send the initial set of root paths to the pool of workers. // Note that we only send directories. For files, we send to them the @@ -884,7 +884,7 @@ impl WalkParallel { } } }; - queue.push(Message::Work(Work { + stack.push(Message::Work(Work { dent: dent, ignore: self.ig_root.clone(), })); @@ -902,7 +902,7 @@ impl WalkParallel { for _ in 0..threads { let worker = Worker { f: mkf(), - queue: queue.clone(), + stack: stack.clone(), quit_now: quit_now.clone(), is_waiting: false, is_quitting: false, @@ -1003,8 +1003,8 @@ impl Work { struct Worker { /// The caller's callback. f: Box) -> WalkState + Send + 'static>, - /// A queue of work items. This is multi-producer and multi-consumer. - queue: Arc>, + /// A stack of work items. This is multi-producer and multi-consumer. + stack: Arc>, /// Whether all workers should quit at the next opportunity. Note that /// this is distinct from quitting because of exhausting the contents of /// a directory. Instead, this is used when the caller's callback indicates @@ -1043,6 +1043,7 @@ impl Worker { while let Some(mut work) = self.get_work() { // If the work is not a directory, then we can just execute the // caller's callback immediately and move on. + if !work.is_dir() { if (self.f)(Ok(work.dent)).is_quit() { self.quit_now(); @@ -1080,7 +1081,25 @@ impl Worker { if self.max_depth.map_or(false, |max| depth >= max) { continue; } - for result in readdir { + + let mut entries: Vec<_> = readdir.collect(); + entries.sort_by(|a, b| { + match (a, b) { + (&Ok(ref a), &Ok(ref b)) => { + if a.file_name() < b.file_name() { + cmp::Ordering::Less + } else { + cmp::Ordering::Greater + } + } + (&Err(_), &Err(_)) => cmp::Ordering::Equal, + (&Ok(_), &Err(_)) => cmp::Ordering::Greater, + (&Err(_), &Ok(_)) => cmp::Ordering::Less, + } + }); + entries.reverse(); + + for result in entries { if self.run_one(&work.ignore, depth + 1, result).is_quit() { self.quit_now(); return; @@ -1092,7 +1111,7 @@ impl Worker { /// Runs the worker on a single entry from a directory iterator. /// /// If the entry is a path that should be ignored, then this is a no-op. - /// Otherwise, the entry is pushed on to the queue. (The actual execution + /// Otherwise, the entry is pushed on to the stack. (The actual execution /// of the callback happens in `run`.) /// /// If an error occurs while reading the entry, then it is sent to the @@ -1144,7 +1163,7 @@ impl Worker { }; if !should_skip_path && !should_skip_filesize { - self.queue.push(Message::Work(Work { + self.stack.push(Message::Work(Work { dent: dent, ignore: ig.clone(), })); @@ -1161,7 +1180,7 @@ impl Worker { if self.is_quit_now() { return None; } - match self.queue.try_pop() { + match self.stack.pop() { Some(Message::Work(work)) => { self.waiting(false); self.quitting(false); @@ -1203,7 +1222,7 @@ impl Worker { self.quitting(false); if self.num_waiting() == self.threads { for _ in 0..self.threads { - self.queue.push(Message::Quit); + self.stack.push(Message::Quit); } } else { // You're right to consider this suspicious, but it's