Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

PVF: Remove rayon and some uses of tokio #7153

Merged
merged 16 commits into from
May 16, 2023
26 changes: 17 additions & 9 deletions node/core/pvf/worker/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,14 +177,14 @@ pub mod thread {
}

/// Helper type.
type Cond = Arc<(Mutex<WaitOutcome>, Condvar)>;
pub type Cond = Arc<(Mutex<WaitOutcome>, Condvar)>;

/// Gets a condvar initialized to `Pending`.
pub fn get_condvar() -> Cond {
Arc::new((Mutex::new(WaitOutcome::Pending), Condvar::new()))
}

/// Runs a thread, afterwards notifying the thread waiting on the condvar. Catches panics and
/// Runs a thread, afterwards notifying the threads waiting on the condvar. Catches panics and
/// resumes them after triggering the condvar, so that the waiting thread is notified on panics.
pub fn spawn_worker_thread<F, R>(
name: &str,
Expand Down Expand Up @@ -221,37 +221,45 @@ pub mod thread {
.spawn(move || cond_notify_on_done(f, cond, outcome))
}

/// Runs a function, afterwards notifying the thread waiting on the condvar. Catches panics and
/// Runs a function, afterwards notifying the threads waiting on the condvar. Catches panics and
/// resumes them after triggering the condvar, so that the waiting thread is notified on panics.
fn cond_notify_on_done<F, R>(f: F, cond: Cond, outcome: WaitOutcome) -> R
where
F: FnOnce() -> R,
F: panic::UnwindSafe,
{
let result = panic::catch_unwind(|| f());
cond_notify_one(cond, outcome);
cond_notify_all(cond, outcome);
match result {
Ok(inner) => return inner,
Err(err) => panic::resume_unwind(err),
}
}

/// Helper function to notify the thread waiting on this condvar.
fn cond_notify_one(cond: Cond, outcome: WaitOutcome) {
/// Helper function to notify all threads waiting on this condvar.
fn cond_notify_all(cond: Cond, outcome: WaitOutcome) {
let (lock, cvar) = &*cond;
let mut flag = lock.lock().unwrap();
let mut flag = lock
.lock()
.expect("only panics if the lock is already held by the current thread; qed");
mrcnski marked this conversation as resolved.
Show resolved Hide resolved
if !flag.is_pending() {
// Someone else already triggered the condvar.
return
}
*flag = outcome;
cvar.notify_one();
cvar.notify_all();
}

/// Block the thread while it waits on the condvar.
pub fn wait_for_threads(cond: Cond) -> WaitOutcome {
let (lock, cvar) = &*cond;
let guard = cvar.wait_while(lock.lock().unwrap(), |flag| flag.is_pending()).unwrap();
let guard = cvar
.wait_while(
lock.lock()
.expect("only panics if the lock is already held by the current thread; qed"),
|flag| flag.is_pending(),
)
.unwrap();
*guard
}
}
97 changes: 45 additions & 52 deletions node/core/pvf/worker/src/memory_stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,12 @@
/// NOTE: Requires jemalloc enabled.
#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
pub mod memory_tracker {
use crate::{common::stringify_panic_payload, LOG_TARGET};
use polkadot_node_core_pvf::MemoryAllocationStats;
use std::{
sync::mpsc::{Receiver, RecvTimeoutError, Sender},
thread::JoinHandle,
time::Duration,
use crate::{
common::{stringify_panic_payload, thread},
LOG_TARGET,
};
use polkadot_node_core_pvf::MemoryAllocationStats;
use std::{thread::JoinHandle, time::Duration};
use tikv_jemalloc_ctl::{epoch, stats, Error};

#[derive(Clone)]
Expand Down Expand Up @@ -79,14 +78,14 @@ pub mod memory_tracker {
/// 2. Sleep for some short interval. Whenever we wake up, take a snapshot by updating the
/// allocation epoch.
///
/// 3. When we receive a signal that preparation has completed, take one last snapshot and return
/// 3. When we are notified that preparation has completed, take one last snapshot and return
/// the maximum observed values.
///
/// # Errors
///
/// For simplicity, any errors are returned as a string. As this is not a critical component, errors
/// are used for informational purposes (logging) only.
pub fn memory_tracker_loop(finished_rx: Receiver<()>) -> Result<MemoryAllocationStats, String> {
pub fn memory_tracker_loop(condvar: thread::Cond) -> Result<MemoryAllocationStats, String> {
// This doesn't need to be too fine-grained since preparation currently takes 3-10s or more.
// Apart from that, there is not really a science to this number.
const POLL_INTERVAL: Duration = Duration::from_millis(100);
Expand All @@ -105,62 +104,56 @@ pub mod memory_tracker {
Ok(())
};

let (lock, cvar) = &*condvar;
loop {
// Take a snapshot and update the max stats.
update_stats()?;

// Sleep.
match finished_rx.recv_timeout(POLL_INTERVAL) {
// Received finish signal.
Ok(()) => {
update_stats()?;
return Ok(max_stats)
},
// Timed out, restart loop.
Err(RecvTimeoutError::Timeout) => continue,
Err(RecvTimeoutError::Disconnected) =>
return Err("memory_tracker_loop: finished_rx disconnected".into()),
// Sleep for the poll interval, or wake up if the condvar is triggered. Note that
// `wait_timeout_while` is documented as not being very precise or reliable, which is
// fine here -- see note above.
let result = cvar
.wait_timeout_while(
lock.lock().expect(
"only panics if the lock is already held by the current thread; qed",
),
POLL_INTERVAL,
|flag| flag.is_pending(),
)
.unwrap();
if result.1.timed_out() {
continue
} else {
update_stats()?;
return Ok(max_stats)
}
}
}

/// Helper function to terminate the memory tracker thread and get the stats. Helps isolate all this
/// error handling.
/// Helper function to get the stats from the memory tracker. Helps isolate this error handling.
pub async fn get_memory_tracker_loop_stats(
thread: JoinHandle<Result<MemoryAllocationStats, String>>,
tx: Sender<()>,
worker_pid: u32,
) -> Option<MemoryAllocationStats> {
// Signal to the memory tracker thread to terminate.
if let Err(err) = tx.send(()) {
gum::warn!(
target: LOG_TARGET,
%worker_pid,
"worker: error sending signal to memory tracker_thread: {}",
err
);
None
} else {
// Join on the thread handle.
match thread.join() {
Ok(Ok(stats)) => Some(stats),
Ok(Err(err)) => {
gum::warn!(
target: LOG_TARGET,
%worker_pid,
"worker: error occurred in the memory tracker thread: {}", err
);
None
},
Err(err) => {
gum::warn!(
target: LOG_TARGET,
%worker_pid,
"worker: error joining on memory tracker thread: {}", stringify_panic_payload(err)
);
None
},
}
// Join on the thread handle.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: comment seems superfluous

match thread.join() {
Ok(Ok(stats)) => Some(stats),
Ok(Err(err)) => {
gum::warn!(
target: LOG_TARGET,
%worker_pid,
"worker: error occurred in the memory tracker thread: {}", err
);
None
},
Err(err) => {
gum::warn!(
target: LOG_TARGET,
%worker_pid,
"worker: error joining on memory tracker thread: {}", stringify_panic_payload(err)
);
None
},
}
}
}
Expand Down
19 changes: 7 additions & 12 deletions node/core/pvf/worker/src/prepare.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,15 +102,15 @@ pub fn worker_entrypoint(socket_path: &str, node_version: Option<&str>) {

let preparation_timeout = pvf.prep_timeout();

// Run the memory tracker.
#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
let (memory_tracker_tx, memory_tracker_rx) = channel::<()>();
#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
let memory_tracker_thread = thread::spawn(move || memory_tracker_loop(memory_tracker_rx));

// Conditional variable to notify us when a thread is done.
let condvar = thread::get_condvar();

// Run the memory tracker in a regular, non-worker thread.
#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
let memory_tracker_condvar = Arc::clone(&condvar);
#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
let memory_tracker_thread = std::thread::spawn(|| memory_tracker_loop(memory_tracker_condvar));

let cpu_time_start = ProcessTime::now();

// Spawn a new thread that runs the CPU time monitor.
Expand Down Expand Up @@ -160,12 +160,7 @@ pub fn worker_entrypoint(socket_path: &str, node_version: Option<&str>) {

// Stop the memory stats worker and get its observed memory stats.
#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
let memory_tracker_stats = get_memory_tracker_loop_stats(
memory_tracker_thread,
memory_tracker_tx,
worker_pid,
)
.await;
let memory_tracker_stats = get_memory_tracker_loop_stats(memory_tracker_thread, worker_pid).await;
let memory_stats = MemoryStats {
#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
memory_tracker_stats,
Expand Down