Skip to content

Commit

Permalink
walk: Use unbounded channels
Browse files Browse the repository at this point in the history
We originally switched to bounded channels for backpressure to fix sharkdp#918.
However, bounded channels have a significant initialization overhead as
they pre-allocate a fixed-size buffer for the messages.

This implementation uses a different backpressure strategy: each thread
gets a limited-size pool of WorkerResults.  When the size limit is hit,
the sender thread has to wait for the receiver thread to handle a result
from that pool and recycle it.

Inspired by [snmalloc], results are recycled by sending the boxed result
over a channel back to the thread that allocated it.  By allocating and
freeing each WorkerResult from the same thread, allocator contention is
reduced dramatically.  And since we now pass results by pointer instead
of by value, message passing overhead is reduced as well.

Fixes sharkdp#1408.

[snmalloc]: https://github.com/microsoft/snmalloc
  • Loading branch information
tavianator committed Nov 2, 2023
1 parent 70126ca commit 76e8437
Show file tree
Hide file tree
Showing 2 changed files with 107 additions and 31 deletions.
31 changes: 16 additions & 15 deletions src/exec/job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,17 @@ use std::sync::Mutex;
use crossbeam_channel::Receiver;

use crate::config::Config;
use crate::dir_entry::DirEntry;
use crate::error::print_error;
use crate::exit_codes::{merge_exitcodes, ExitCode};
use crate::walk::WorkerResult;
use crate::walk::{WorkerMsg, WorkerResult};

use super::CommandSet;

/// An event loop that listens for inputs from the `rx` receiver. Each received input will
/// generate a command with the supplied command template. The generated command will then
/// be executed, and this process will continue until the receiver's sender has closed.
pub fn job(
rx: Receiver<WorkerResult>,
rx: Receiver<WorkerMsg>,
cmd: &CommandSet,
out_perm: &Mutex<()>,
config: &Config,
Expand All @@ -26,7 +25,8 @@ pub fn job(
loop {
// Obtain the next result from the receiver, else if the channel
// has closed, exit from the loop
let dir_entry: DirEntry = match rx.recv() {
let result = rx.recv().map(WorkerMsg::take);
let dir_entry = match result {
Ok(WorkerResult::Entry(dir_entry)) => dir_entry,
Ok(WorkerResult::Error(err)) => {
if config.show_filesystem_errors {
Expand All @@ -49,18 +49,19 @@ pub fn job(
merge_exitcodes(results)
}

pub fn batch(rx: Receiver<WorkerResult>, cmd: &CommandSet, config: &Config) -> ExitCode {
let paths = rx
.into_iter()
.filter_map(|worker_result| match worker_result {
WorkerResult::Entry(dir_entry) => Some(dir_entry.into_stripped_path(config)),
WorkerResult::Error(err) => {
if config.show_filesystem_errors {
print_error(err.to_string());
pub fn batch(rx: Receiver<WorkerMsg>, cmd: &CommandSet, config: &Config) -> ExitCode {
let paths =
rx.into_iter()
.map(WorkerMsg::take)
.filter_map(|worker_result| match worker_result {
WorkerResult::Entry(dir_entry) => Some(dir_entry.into_stripped_path(config)),
WorkerResult::Error(err) => {
if config.show_filesystem_errors {
print_error(err.to_string());
}
None
}
None
}
});
});

cmd.execute_batch(paths, config.batch_size, config.path_separator.as_deref())
}
107 changes: 91 additions & 16 deletions src/walk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::thread;
use std::time::{Duration, Instant};

use anyhow::{anyhow, Result};
use crossbeam_channel::{bounded, Receiver, RecvTimeoutError, Sender};
use crossbeam_channel::{unbounded, Receiver, RecvTimeoutError, Sender};
use etcetera::BaseStrategy;
use ignore::overrides::{Override, OverrideBuilder};
use ignore::{self, WalkBuilder, WalkParallel, WalkState};
Expand Down Expand Up @@ -43,6 +43,77 @@ pub enum WorkerResult {
Error(ignore::Error),
}

/// Storage for a WorkerResult.
type ResultBox = Box<Option<WorkerResult>>;

/// A WorkerResult that recycles itself.
pub struct WorkerMsg {
inner: Option<ResultBox>,
tx: Sender<ResultBox>,
}

impl WorkerMsg {
/// Create a new message.
fn new(inner: ResultBox, tx: Sender<ResultBox>) -> Self {
Self {
inner: Some(inner),
tx,
}
}

/// Extract the result from this message.
pub fn take(mut self) -> WorkerResult {
self.inner.as_mut().unwrap().take().unwrap()
}
}

impl Drop for WorkerMsg {
fn drop(&mut self) {
let _ = self.tx.send(self.inner.take().unwrap());
}
}

/// A pool of WorkerResults that can be recycled.
struct ResultPool {
size: usize,
tx: Sender<ResultBox>,
rx: Receiver<ResultBox>,
}

/// Capacity was chosen empircally to perform similarly to an unbounded channel
const RESULT_POOL_CAPACITY: usize = 0x4000;

impl ResultPool {
/// Create an empty pool.
fn new() -> Self {
let (tx, rx) = unbounded();

Self { size: 0, tx, rx }
}

/// Allocate or recycle a WorkerResult from the pool.
fn get(&mut self, result: WorkerResult) -> WorkerMsg {
let inner = if self.size < RESULT_POOL_CAPACITY {
match self.rx.try_recv() {
Ok(mut inner) => {
*inner = Some(result);
inner
}
Err(_) => {
self.size += 1;
Box::new(Some(result))
}
}
} else {
let mut inner = self.rx.recv().unwrap();
*inner = Some(result);
inner
};

WorkerMsg::new(inner, self.tx.clone())
}
}

/// Maximum size of the output buffer before flushing results to the console
const MAX_BUFFER_LENGTH: usize = 1000;
/// Default duration until output buffering switches to streaming.
Expand All @@ -56,8 +127,8 @@ struct ReceiverBuffer<'a, W> {
quit_flag: &'a AtomicBool,
/// The ^C notifier.
interrupt_flag: &'a AtomicBool,
/// Receiver for worker results.
rx: Receiver<WorkerResult>,
/// Receiver for worker messages.
rx: Receiver<WorkerMsg>,
/// Standard output.
stdout: W,
/// The current buffer mode.
Expand All @@ -72,7 +143,7 @@ struct ReceiverBuffer<'a, W> {

impl<'a, W: Write> ReceiverBuffer<'a, W> {
/// Create a new receiver buffer.
fn new(state: &'a WorkerState, rx: Receiver<WorkerResult>, stdout: W) -> Self {
fn new(state: &'a WorkerState, rx: Receiver<WorkerMsg>, stdout: W) -> Self {
let config = &state.config;
let quit_flag = state.quit_flag.as_ref();
let interrupt_flag = state.interrupt_flag.as_ref();
Expand Down Expand Up @@ -104,7 +175,7 @@ impl<'a, W: Write> ReceiverBuffer<'a, W> {

/// Receive the next worker result.
fn recv(&self) -> Result<WorkerResult, RecvTimeoutError> {
match self.mode {
let result = match self.mode {
ReceiverMode::Buffering => {
// Wait at most until we should switch to streaming
self.rx.recv_deadline(self.deadline)
Expand All @@ -113,7 +184,8 @@ impl<'a, W: Write> ReceiverBuffer<'a, W> {
// Wait however long it takes for a result
Ok(self.rx.recv()?)
}
}
};
result.map(WorkerMsg::take)
}

/// Wait for a result or state change.
Expand Down Expand Up @@ -319,7 +391,7 @@ impl WorkerState {

/// Run the receiver work, either on this thread or a pool of background
/// threads (for --exec).
fn receive(&self, rx: Receiver<WorkerResult>) -> ExitCode {
fn receive(&self, rx: Receiver<WorkerMsg>) -> ExitCode {
let config = &self.config;

// This will be set to `Some` if the `--exec` argument was supplied.
Expand Down Expand Up @@ -355,12 +427,13 @@ impl WorkerState {
}

/// Spawn the sender threads.
fn spawn_senders(&self, walker: WalkParallel, tx: Sender<WorkerResult>) {
fn spawn_senders(&self, walker: WalkParallel, tx: Sender<WorkerMsg>) {
walker.run(|| {
let patterns = &self.patterns;
let config = &self.config;
let quit_flag = self.quit_flag.as_ref();
let tx = tx.clone();
let mut pool = ResultPool::new();

Box::new(move |entry| {
if quit_flag.load(Ordering::Relaxed) {
Expand All @@ -387,20 +460,22 @@ impl WorkerState {
DirEntry::broken_symlink(path)
}
_ => {
return match tx.send(WorkerResult::Error(ignore::Error::WithPath {
let result = pool.get(WorkerResult::Error(ignore::Error::WithPath {
path,
err: inner_err,
})) {
}));
return match tx.send(result) {
Ok(_) => WalkState::Continue,
Err(_) => WalkState::Quit,
}
};
}
},
Err(err) => {
return match tx.send(WorkerResult::Error(err)) {
let result = pool.get(WorkerResult::Error(err));
return match tx.send(result) {
Ok(_) => WalkState::Continue,
Err(_) => WalkState::Quit,
}
};
}
};

Expand Down Expand Up @@ -509,7 +584,8 @@ impl WorkerState {
}
}

let send_result = tx.send(WorkerResult::Entry(entry));
let result = pool.get(WorkerResult::Entry(entry));
let send_result = tx.send(result);

if send_result.is_err() {
return WalkState::Quit;
Expand Down Expand Up @@ -545,8 +621,7 @@ impl WorkerState {
.unwrap();
}

// Channel capacity was chosen empircally to perform similarly to an unbounded channel
let (tx, rx) = bounded(0x4000 * config.threads);
let (tx, rx) = unbounded();

let exit_code = thread::scope(|scope| {
// Spawn the receiver thread(s)
Expand Down

0 comments on commit 76e8437

Please sign in to comment.