Skip to content

Commit

Permalink
Merge pull request #1422 from tavianator/batch
Browse files Browse the repository at this point in the history
walk: Send WorkerResults in batches
  • Loading branch information
tavianator authored Nov 29, 2023
2 parents 5903dec + b8a5f95 commit 84f032e
Show file tree
Hide file tree
Showing 3 changed files with 147 additions and 52 deletions.
2 changes: 2 additions & 0 deletions src/dir_entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,13 @@ use lscolors::{Colorable, LsColors, Style};
use crate::config::Config;
use crate::filesystem::strip_current_dir;

#[derive(Debug)]
enum DirEntryInner {
Normal(ignore::DirEntry),
BrokenSymlink(PathBuf),
}

#[derive(Debug)]
pub struct DirEntry {
inner: DirEntryInner,
metadata: OnceCell<Option<Metadata>>,
Expand Down
31 changes: 16 additions & 15 deletions src/exec/job.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
use std::sync::Mutex;

use crossbeam_channel::Receiver;

use crate::config::Config;
use crate::dir_entry::DirEntry;
use crate::error::print_error;
use crate::exit_codes::{merge_exitcodes, ExitCode};
use crate::walk::WorkerResult;
Expand All @@ -14,43 +11,47 @@ use super::CommandSet;
/// generate a command with the supplied command template. The generated command will then
/// be executed, and this process will continue until the receiver's sender has closed.
pub fn job(
rx: Receiver<WorkerResult>,
results: impl IntoIterator<Item = WorkerResult>,
cmd: &CommandSet,
out_perm: &Mutex<()>,
config: &Config,
) -> ExitCode {
// Output should be buffered when only running a single thread
let buffer_output: bool = config.threads > 1;

let mut results: Vec<ExitCode> = Vec::new();
loop {
let mut ret = ExitCode::Success;
for result in results {
// Obtain the next result from the receiver, else if the channel
// has closed, exit from the loop
let dir_entry: DirEntry = match rx.recv() {
Ok(WorkerResult::Entry(dir_entry)) => dir_entry,
Ok(WorkerResult::Error(err)) => {
let dir_entry = match result {
WorkerResult::Entry(dir_entry) => dir_entry,
WorkerResult::Error(err) => {
if config.show_filesystem_errors {
print_error(err.to_string());
}
continue;
}
Err(_) => break,
};

// Generate a command, execute it and store its exit code.
results.push(cmd.execute(
let code = cmd.execute(
dir_entry.stripped_path(config),
config.path_separator.as_deref(),
out_perm,
buffer_output,
))
);
ret = merge_exitcodes([ret, code]);
}
// Returns error in case of any error.
merge_exitcodes(results)
ret
}

pub fn batch(rx: Receiver<WorkerResult>, cmd: &CommandSet, config: &Config) -> ExitCode {
let paths = rx
pub fn batch(
results: impl IntoIterator<Item = WorkerResult>,
cmd: &CommandSet,
config: &Config,
) -> ExitCode {
let paths = results
.into_iter()
.filter_map(|worker_result| match worker_result {
WorkerResult::Entry(dir_entry) => Some(dir_entry.into_stripped_path(config)),
Expand Down
166 changes: 129 additions & 37 deletions src/walk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@ use std::io::{self, Write};
use std::mem;
use std::path::PathBuf;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex};
use std::sync::{Arc, Mutex, MutexGuard};
use std::thread;
use std::time::{Duration, Instant};

use anyhow::{anyhow, Result};
use crossbeam_channel::{bounded, Receiver, RecvTimeoutError, Sender};
use crossbeam_channel::{bounded, Receiver, RecvTimeoutError, SendError, Sender};
use etcetera::BaseStrategy;
use ignore::overrides::{Override, OverrideBuilder};
use ignore::{self, WalkBuilder, WalkParallel, WalkState};
Expand All @@ -36,13 +36,91 @@ enum ReceiverMode {

/// The Worker threads can result in a valid entry having PathBuf or an error.
#[allow(clippy::large_enum_variant)]
#[derive(Debug)]
pub enum WorkerResult {
// Errors should be rare, so it's probably better to allow large_enum_variant than
// to box the Entry variant
Entry(DirEntry),
Error(ignore::Error),
}

/// A batch of WorkerResults to send over a channel.
#[derive(Clone)]
struct Batch {
items: Arc<Mutex<Option<Vec<WorkerResult>>>>,
}

impl Batch {
fn new() -> Self {
Self {
items: Arc::new(Mutex::new(Some(vec![]))),
}
}

fn lock(&self) -> MutexGuard<'_, Option<Vec<WorkerResult>>> {
self.items.lock().unwrap()
}
}

impl IntoIterator for Batch {
type Item = WorkerResult;
type IntoIter = std::vec::IntoIter<WorkerResult>;

fn into_iter(self) -> Self::IntoIter {
self.lock().take().unwrap().into_iter()
}
}

/// Wrapper that sends batches of items at once over a channel.
struct BatchSender {
batch: Batch,
tx: Sender<Batch>,
limit: usize,
}

impl BatchSender {
fn new(tx: Sender<Batch>, limit: usize) -> Self {
Self {
batch: Batch::new(),
tx,
limit,
}
}

/// Check if we need to flush a batch.
fn needs_flush(&self, batch: Option<&Vec<WorkerResult>>) -> bool {
match batch {
// Limit the batch size to provide some backpressure
Some(vec) => vec.len() >= self.limit,
// Batch was already taken by the receiver, so make a new one
None => true,
}
}

/// Add an item to a batch.
fn send(&mut self, item: WorkerResult) -> Result<(), SendError<()>> {
let mut batch = self.batch.lock();

if self.needs_flush(batch.as_ref()) {
drop(batch);
self.batch = Batch::new();
batch = self.batch.lock();
}

let items = batch.as_mut().unwrap();
items.push(item);

if items.len() == 1 {
// New batch, send it over the channel
self.tx
.send(self.batch.clone())
.map_err(|_| SendError(()))?;
}

Ok(())
}
}

/// Maximum size of the output buffer before flushing results to the console
const MAX_BUFFER_LENGTH: usize = 1000;
/// Default duration until output buffering switches to streaming.
Expand All @@ -57,7 +135,7 @@ struct ReceiverBuffer<'a, W> {
/// The ^C notifier.
interrupt_flag: &'a AtomicBool,
/// Receiver for worker results.
rx: Receiver<WorkerResult>,
rx: Receiver<Batch>,
/// Standard output.
stdout: W,
/// The current buffer mode.
Expand All @@ -72,7 +150,7 @@ struct ReceiverBuffer<'a, W> {

impl<'a, W: Write> ReceiverBuffer<'a, W> {
/// Create a new receiver buffer.
fn new(state: &'a WorkerState, rx: Receiver<WorkerResult>, stdout: W) -> Self {
fn new(state: &'a WorkerState, rx: Receiver<Batch>, stdout: W) -> Self {
let config = &state.config;
let quit_flag = state.quit_flag.as_ref();
let interrupt_flag = state.interrupt_flag.as_ref();
Expand Down Expand Up @@ -103,7 +181,7 @@ impl<'a, W: Write> ReceiverBuffer<'a, W> {
}

/// Receive the next worker result.
fn recv(&self) -> Result<WorkerResult, RecvTimeoutError> {
fn recv(&self) -> Result<Batch, RecvTimeoutError> {
match self.mode {
ReceiverMode::Buffering => {
// Wait at most until we should switch to streaming
Expand All @@ -119,36 +197,42 @@ impl<'a, W: Write> ReceiverBuffer<'a, W> {
/// Wait for a result or state change.
fn poll(&mut self) -> Result<(), ExitCode> {
match self.recv() {
Ok(WorkerResult::Entry(dir_entry)) => {
if self.config.quiet {
return Err(ExitCode::HasResults(true));
}
Ok(batch) => {
for result in batch {
match result {
WorkerResult::Entry(dir_entry) => {
if self.config.quiet {
return Err(ExitCode::HasResults(true));
}

match self.mode {
ReceiverMode::Buffering => {
self.buffer.push(dir_entry);
if self.buffer.len() > MAX_BUFFER_LENGTH {
self.stream()?;
}
}
ReceiverMode::Streaming => {
self.print(&dir_entry)?;
self.flush()?;
}
}
match self.mode {
ReceiverMode::Buffering => {
self.buffer.push(dir_entry);
if self.buffer.len() > MAX_BUFFER_LENGTH {
self.stream()?;
}
}
ReceiverMode::Streaming => {
self.print(&dir_entry)?;
self.flush()?;
}
}

self.num_results += 1;
if let Some(max_results) = self.config.max_results {
if self.num_results >= max_results {
return self.stop();
self.num_results += 1;
if let Some(max_results) = self.config.max_results {
if self.num_results >= max_results {
return self.stop();
}
}
}
WorkerResult::Error(err) => {
if self.config.show_filesystem_errors {
print_error(err.to_string());
}
}
}
}
}
Ok(WorkerResult::Error(err)) => {
if self.config.show_filesystem_errors {
print_error(err.to_string());
}
}
Err(RecvTimeoutError::Timeout) => {
self.stream()?;
}
Expand Down Expand Up @@ -319,13 +403,13 @@ impl WorkerState {

/// Run the receiver work, either on this thread or a pool of background
/// threads (for --exec).
fn receive(&self, rx: Receiver<WorkerResult>) -> ExitCode {
fn receive(&self, rx: Receiver<Batch>) -> ExitCode {
let config = &self.config;

// This will be set to `Some` if the `--exec` argument was supplied.
if let Some(ref cmd) = config.command {
if cmd.in_batch_mode() {
exec::batch(rx, cmd, &config)
exec::batch(rx.into_iter().flatten(), cmd, &config)
} else {
let out_perm = Mutex::new(());

Expand All @@ -337,7 +421,8 @@ impl WorkerState {
let rx = rx.clone();

// Spawn a job thread that will listen for and execute inputs.
let handle = scope.spawn(|| exec::job(rx, cmd, &out_perm, &config));
let handle = scope
.spawn(|| exec::job(rx.into_iter().flatten(), cmd, &out_perm, &config));

// Push the handle of the spawned thread into the vector for later joining.
handles.push(handle);
Expand All @@ -355,12 +440,20 @@ impl WorkerState {
}

/// Spawn the sender threads.
fn spawn_senders(&self, walker: WalkParallel, tx: Sender<WorkerResult>) {
fn spawn_senders(&self, walker: WalkParallel, tx: Sender<Batch>) {
walker.run(|| {
let patterns = &self.patterns;
let config = &self.config;
let quit_flag = self.quit_flag.as_ref();
let tx = tx.clone();

let mut limit = 0x100;
if let Some(cmd) = &config.command {
if !cmd.in_batch_mode() && config.threads > 1 {
// Evenly distribute work between multiple receivers
limit = 1;
}
}
let mut tx = BatchSender::new(tx.clone(), limit);

Box::new(move |entry| {
if quit_flag.load(Ordering::Relaxed) {
Expand Down Expand Up @@ -545,8 +638,7 @@ impl WorkerState {
.unwrap();
}

// Channel capacity was chosen empircally to perform similarly to an unbounded channel
let (tx, rx) = bounded(0x4000 * config.threads);
let (tx, rx) = bounded(2 * config.threads);

let exit_code = thread::scope(|scope| {
// Spawn the receiver thread(s)
Expand Down

0 comments on commit 84f032e

Please sign in to comment.