diff --git a/src/cargo/core/compiler/job_queue.rs b/src/cargo/core/compiler/job_queue.rs index 55832cb906a..1cd5a54fdde 100644 --- a/src/cargo/core/compiler/job_queue.rs +++ b/src/cargo/core/compiler/job_queue.rs @@ -54,7 +54,6 @@ use std::collections::{BTreeMap, HashMap, HashSet}; use std::io; use std::marker; use std::mem; -use std::sync::mpsc::{channel, Receiver, Sender}; use std::sync::Arc; use std::time::Duration; @@ -74,6 +73,7 @@ use crate::core::{PackageId, TargetKind}; use crate::handle_error; use crate::util; use crate::util::diagnostic_server::{self, DiagnosticPrinter}; +use crate::util::Queue; use crate::util::{internal, profile, CargoResult, CargoResultExt, ProcessBuilder}; use crate::util::{Config, DependencyQueue}; use crate::util::{Progress, ProgressStyle}; @@ -99,8 +99,7 @@ struct DrainState<'a, 'cfg> { total_units: usize, queue: DependencyQueue, Artifact, Job>, - tx: Sender, - rx: Receiver, + messages: Arc>, active: HashMap>, compiled: HashSet, documented: HashSet, @@ -146,7 +145,7 @@ impl std::fmt::Display for JobId { pub struct JobState<'a> { /// Channel back to the main thread to coordinate messages and such. - tx: Sender, + messages: Arc>, /// The job id that this state is associated with, used when sending /// messages back to the main thread. @@ -200,7 +199,7 @@ enum Message { impl<'a> JobState<'a> { pub fn running(&self, cmd: &ProcessBuilder) { - let _ = self.tx.send(Message::Run(self.id, cmd.to_string())); + self.messages.push(Message::Run(self.id, cmd.to_string())); } pub fn build_plan( @@ -209,17 +208,16 @@ impl<'a> JobState<'a> { cmd: ProcessBuilder, filenames: Arc>, ) { - let _ = self - .tx - .send(Message::BuildPlanMsg(module_name, cmd, filenames)); + self.messages + .push(Message::BuildPlanMsg(module_name, cmd, filenames)); } pub fn stdout(&self, stdout: String) { - drop(self.tx.send(Message::Stdout(stdout))); + self.messages.push(Message::Stdout(stdout)); } pub fn stderr(&self, stderr: String) { - drop(self.tx.send(Message::Stderr(stderr))); + self.messages.push(Message::Stderr(stderr)); } /// A method used to signal to the coordinator thread that the rmeta file @@ -229,9 +227,8 @@ impl<'a> JobState<'a> { /// produced once! pub fn rmeta_produced(&self) { self.rmeta_required.set(false); - let _ = self - .tx - .send(Message::Finish(self.id, Artifact::Metadata, Ok(()))); + self.messages + .push(Message::Finish(self.id, Artifact::Metadata, Ok(()))); } /// The rustc underlying this Job is about to acquire a jobserver token (i.e., block) @@ -240,14 +237,14 @@ impl<'a> JobState<'a> { /// This should arrange for the associated client to eventually get a token via /// `client.release_raw()`. pub fn will_acquire(&self) { - let _ = self.tx.send(Message::NeedsToken(self.id)); + self.messages.push(Message::NeedsToken(self.id)); } /// The rustc underlying this Job is informing us that it is done with a jobserver token. /// /// Note that it does *not* write that token back anywhere. pub fn release_token(&self) { - let _ = self.tx.send(Message::ReleaseToken(self.id)); + self.messages.push(Message::ReleaseToken(self.id)); } } @@ -341,13 +338,11 @@ impl<'a, 'cfg> JobQueue<'a, 'cfg> { let _p = profile::start("executing the job graph"); self.queue.queue_finished(); - let (tx, rx) = channel(); let progress = Progress::with_style("Building", ProgressStyle::Ratio, cx.bcx.config); let state = DrainState { total_units: self.queue.len(), queue: self.queue, - tx, - rx, + messages: Arc::new(Queue::new()), active: HashMap::new(), compiled: HashSet::new(), documented: HashSet::new(), @@ -365,25 +360,25 @@ impl<'a, 'cfg> JobQueue<'a, 'cfg> { }; // Create a helper thread for acquiring jobserver tokens - let tx = state.tx.clone(); + let messages = state.messages.clone(); let helper = cx .jobserver .clone() .into_helper_thread(move |token| { - drop(tx.send(Message::Token(token))); + drop(messages.push(Message::Token(token))); }) .chain_err(|| "failed to create helper thread for jobserver management")?; // Create a helper thread to manage the diagnostics for rustfix if // necessary. - let tx = state.tx.clone(); + let messages = state.messages.clone(); let _diagnostic_server = cx .bcx .build_config .rustfix_diagnostic_server .borrow_mut() .take() - .map(move |srv| srv.start(move |msg| drop(tx.send(Message::FixDiagnostic(msg))))); + .map(move |srv| srv.start(move |msg| drop(messages.push(Message::FixDiagnostic(msg))))); crossbeam_utils::thread::scope(move |scope| state.drain_the_queue(cx, plan, scope, &helper)) .expect("child threads shouldn't panic") @@ -585,7 +580,10 @@ impl<'a, 'cfg> DrainState<'a, 'cfg> { // to run above to calculate CPU usage over time. To do this we // listen for a message with a timeout, and on timeout we run the // previous parts of the loop again. - let events: Vec<_> = self.rx.try_iter().collect(); + let mut events = Vec::new(); + while let Some(event) = self.messages.try_pop() { + events.push(event); + } info!( "tokens in use: {}, rustc_tokens: {:?}, waiting_rustcs: {:?} (events this tick: {})", self.tokens.len(), @@ -603,14 +601,16 @@ impl<'a, 'cfg> DrainState<'a, 'cfg> { loop { self.tick_progress(); self.tokens.truncate(self.active.len() - 1); - match self.rx.recv_timeout(Duration::from_millis(500)) { - Ok(message) => break vec![message], - Err(_) => continue, + match self.messages.pop(Duration::from_millis(500)) { + Some(message) => { + events.push(message); + break; + } + None => continue, } } - } else { - events } + return events; } fn drain_the_queue( @@ -757,7 +757,7 @@ impl<'a, 'cfg> DrainState<'a, 'cfg> { assert!(self.active.insert(id, *unit).is_none()); *self.counts.get_mut(&unit.pkg.package_id()).unwrap() -= 1; - let my_tx = self.tx.clone(); + let messages = self.messages.clone(); let fresh = job.freshness(); let rmeta_required = cx.rmeta_required(unit); @@ -769,13 +769,13 @@ impl<'a, 'cfg> DrainState<'a, 'cfg> { let doit = move || { let state = JobState { id, - tx: my_tx.clone(), + messages: messages.clone(), rmeta_required: Cell::new(rmeta_required), _marker: marker::PhantomData, }; let mut sender = FinishOnDrop { - tx: &my_tx, + messages: &messages, id, result: Err(format_err!("worker panicked")), }; @@ -794,9 +794,7 @@ impl<'a, 'cfg> DrainState<'a, 'cfg> { // we need to make sure that the metadata is flagged as produced so // send a synthetic message here. if state.rmeta_required.get() && sender.result.is_ok() { - my_tx - .send(Message::Finish(id, Artifact::Metadata, Ok(()))) - .unwrap(); + messages.push(Message::Finish(id, Artifact::Metadata, Ok(()))); } // Use a helper struct with a `Drop` implementation to guarantee @@ -804,7 +802,7 @@ impl<'a, 'cfg> DrainState<'a, 'cfg> { // shouldn't panic unless there's a bug in Cargo, so we just need // to make sure nothing hangs by accident. struct FinishOnDrop<'a> { - tx: &'a Sender, + messages: &'a Queue, id: JobId, result: CargoResult<()>, } @@ -812,7 +810,8 @@ impl<'a, 'cfg> DrainState<'a, 'cfg> { impl Drop for FinishOnDrop<'_> { fn drop(&mut self) { let msg = mem::replace(&mut self.result, Ok(())); - drop(self.tx.send(Message::Finish(self.id, Artifact::All, msg))); + self.messages + .push(Message::Finish(self.id, Artifact::All, msg)); } } }; diff --git a/src/cargo/util/mod.rs b/src/cargo/util/mod.rs index 122bb65f98b..a2485c1971c 100644 --- a/src/cargo/util/mod.rs +++ b/src/cargo/util/mod.rs @@ -18,6 +18,7 @@ pub use self::paths::{bytes2path, dylib_path, join_paths, path2bytes}; pub use self::paths::{dylib_path_envvar, normalize_path}; pub use self::process_builder::{process, ProcessBuilder}; pub use self::progress::{Progress, ProgressStyle}; +pub use self::queue::Queue; pub use self::read2::read2; pub use self::rustc::Rustc; pub use self::sha256::Sha256; @@ -50,6 +51,7 @@ pub mod paths; pub mod process_builder; pub mod profile; mod progress; +mod queue; mod read2; pub mod rustc; mod sha256; diff --git a/src/cargo/util/queue.rs b/src/cargo/util/queue.rs new file mode 100644 index 00000000000..d9aefcc3b1c --- /dev/null +++ b/src/cargo/util/queue.rs @@ -0,0 +1,54 @@ +use std::collections::VecDeque; +use std::sync::{Condvar, Mutex}; +use std::time::{Duration, Instant}; + +/// A simple, threadsafe, queue of items of type `T` +/// +/// This is a sort of channel where any thread can push to a queue and any +/// thread can pop from a queue. Currently queues have infinite capacity where +/// `push` will never block but `pop` will block. +pub struct Queue { + state: Mutex>, + condvar: Condvar, +} + +struct State { + items: VecDeque, +} + +impl Queue { + pub fn new() -> Queue { + Queue { + state: Mutex::new(State { + items: VecDeque::new(), + }), + condvar: Condvar::new(), + } + } + + pub fn push(&self, item: T) { + self.state.lock().unwrap().items.push_back(item); + self.condvar.notify_one(); + } + + pub fn pop(&self, timeout: Duration) -> Option { + let mut state = self.state.lock().unwrap(); + let now = Instant::now(); + while state.items.is_empty() { + let elapsed = now.elapsed(); + if elapsed >= timeout { + break; + } + let (lock, result) = self.condvar.wait_timeout(state, timeout - elapsed).unwrap(); + state = lock; + if result.timed_out() { + break; + } + } + state.items.pop_front() + } + + pub fn try_pop(&self) -> Option { + self.state.lock().unwrap().items.pop_front() + } +}