Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
Separate preparation timeouts for PVF prechecking and execution (#6139)
Browse files Browse the repository at this point in the history
* Add some documentation

* Add `compilation_timeout` parameter for PVF preparation job

* Update buckets in prometheus metrics

* Update prepare/queue tests

* Update pvf-prechecking overview in implementer docs

* Fix some CI checks
  • Loading branch information
mrcnski authored Oct 13, 2022
1 parent a28b257 commit 851a108
Show file tree
Hide file tree
Showing 8 changed files with 164 additions and 53 deletions.
60 changes: 49 additions & 11 deletions node/core/pvf/src/host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,16 @@ use std::{
time::{Duration, SystemTime},
};

/// The time period after which the precheck preparation worker is considered unresponsive and will
/// be killed.
// NOTE: If you change this make sure to fix the buckets of `pvf_preparation_time` metric.
pub const PRECHECK_COMPILATION_TIMEOUT: Duration = Duration::from_secs(60);

/// The time period after which the execute preparation worker is considered unresponsive and will
/// be killed.
// NOTE: If you change this make sure to fix the buckets of `pvf_preparation_time` metric.
pub const EXECUTE_COMPILATION_TIMEOUT: Duration = Duration::from_secs(180);

/// An alias to not spell the type for the oneshot sender for the PVF execution result.
pub(crate) type ResultSender = oneshot::Sender<Result<ValidationResult, ValidationError>>;

Expand All @@ -51,10 +61,11 @@ pub struct ValidationHost {
}

impl ValidationHost {
/// Precheck PVF with the given code, i.e. verify that it compiles within a reasonable time limit.
/// The result of execution will be sent to the provided result sender.
/// Precheck PVF with the given code, i.e. verify that it compiles within a reasonable time
/// limit. This will prepare the PVF. The result of preparation will be sent to the provided
/// result sender.
///
/// This is async to accommodate the fact a possibility of back-pressure. In the vast majority of
/// This is async to accommodate the possibility of back-pressure. In the vast majority of
/// situations this function should return immediately.
///
/// Returns an error if the request cannot be sent to the validation host, i.e. if it shut down.
Expand All @@ -72,7 +83,7 @@ impl ValidationHost {
/// Execute PVF with the given code, execution timeout, parameters and priority.
/// The result of execution will be sent to the provided result sender.
///
/// This is async to accommodate the fact a possibility of back-pressure. In the vast majority of
/// This is async to accommodate the possibility of back-pressure. In the vast majority of
/// situations this function should return immediately.
///
/// Returns an error if the request cannot be sent to the validation host, i.e. if it shut down.
Expand All @@ -92,7 +103,7 @@ impl ValidationHost {

/// Sends a signal to the validation host requesting to prepare a list of the given PVFs.
///
/// This is async to accommodate the fact a possibility of back-pressure. In the vast majority of
/// This is async to accommodate the possibility of back-pressure. In the vast majority of
/// situations this function should return immediately.
///
/// Returns an error if the request cannot be sent to the validation host, i.e. if it shut down.
Expand Down Expand Up @@ -418,6 +429,9 @@ async fn handle_to_host(
Ok(())
}

/// Handles PVF prechecking.
///
/// This tries to prepare the PVF by compiling the WASM blob within a given timeout ([`PRECHECK_COMPILATION_TIMEOUT`]).
async fn handle_precheck_pvf(
artifacts: &mut Artifacts,
prepare_queue: &mut mpsc::Sender<prepare::ToQueue>,
Expand All @@ -440,12 +454,24 @@ async fn handle_precheck_pvf(
}
} else {
artifacts.insert_preparing(artifact_id, vec![result_sender]);
send_prepare(prepare_queue, prepare::ToQueue::Enqueue { priority: Priority::Normal, pvf })
.await?;
send_prepare(
prepare_queue,
prepare::ToQueue::Enqueue {
priority: Priority::Normal,
pvf,
compilation_timeout: PRECHECK_COMPILATION_TIMEOUT,
},
)
.await?;
}
Ok(())
}

/// Handles PVF execution.
///
/// This will first try to prepare the PVF, if a prepared artifact does not already exist. If there is already a
/// preparation job, we coalesce the two preparation jobs. When preparing for execution, we use a more lenient timeout
/// ([`EXECUTE_COMPILATION_TIMEOUT`]) than when prechecking.
async fn handle_execute_pvf(
cache_path: &Path,
artifacts: &mut Artifacts,
Expand All @@ -462,7 +488,7 @@ async fn handle_execute_pvf(

if let Some(state) = artifacts.artifact_state_mut(&artifact_id) {
match state {
ArtifactState::Prepared { ref mut last_time_needed } => {
ArtifactState::Prepared { last_time_needed } => {
*last_time_needed = SystemTime::now();

send_execute(
Expand All @@ -485,9 +511,17 @@ async fn handle_execute_pvf(
}
} else {
// Artifact is unknown: register it and enqueue a job with the corresponding priority and
//
// PVF.
artifacts.insert_preparing(artifact_id.clone(), Vec::new());
send_prepare(prepare_queue, prepare::ToQueue::Enqueue { priority, pvf }).await?;
send_prepare(
prepare_queue,
prepare::ToQueue::Enqueue {
priority,
pvf,
compilation_timeout: EXECUTE_COMPILATION_TIMEOUT,
},
)
.await?;

awaiting_prepare.add(artifact_id, execution_timeout, params, result_tx);
}
Expand Down Expand Up @@ -520,7 +554,11 @@ async fn handle_heads_up(

send_prepare(
prepare_queue,
prepare::ToQueue::Enqueue { priority: Priority::Normal, pvf: active_pvf },
prepare::ToQueue::Enqueue {
priority: Priority::Normal,
pvf: active_pvf,
compilation_timeout: EXECUTE_COMPILATION_TIMEOUT,
},
)
.await?;
}
Expand Down
4 changes: 3 additions & 1 deletion node/core/pvf/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,8 @@ impl metrics::Metrics for Metrics {
"Time spent in preparing PVF artifacts in seconds",
)
.buckets(vec![
// This is synchronized with COMPILATION_TIMEOUT=60s constant found in
// This is synchronized with the PRECHECK_COMPILATION_TIMEOUT=60s
// and EXECUTE_COMPILATION_TIMEOUT=180s constants found in
// src/prepare/worker.rs
0.1,
0.5,
Expand All @@ -166,6 +167,7 @@ impl metrics::Metrics for Metrics {
20.0,
30.0,
60.0,
180.0,
]),
)?,
registry,
Expand Down
14 changes: 11 additions & 3 deletions node/core/pvf/src/prepare/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,12 @@ pub enum ToPool {
///
/// In either case, the worker is considered busy and no further `StartWork` messages should be
/// sent until either `Concluded` or `Rip` message is received.
StartWork { worker: Worker, code: Arc<Vec<u8>>, artifact_path: PathBuf },
StartWork {
worker: Worker,
code: Arc<Vec<u8>>,
artifact_path: PathBuf,
compilation_timeout: Duration,
},
}

/// A message sent from pool to its client.
Expand Down Expand Up @@ -205,7 +210,7 @@ fn handle_to_pool(
metrics.prepare_worker().on_begin_spawn();
mux.push(spawn_worker_task(program_path.to_owned(), spawn_timeout).boxed());
},
ToPool::StartWork { worker, code, artifact_path } => {
ToPool::StartWork { worker, code, artifact_path, compilation_timeout } => {
if let Some(data) = spawned.get_mut(worker) {
if let Some(idle) = data.idle.take() {
let preparation_timer = metrics.time_preparation();
Expand All @@ -216,6 +221,7 @@ fn handle_to_pool(
code,
cache_path.to_owned(),
artifact_path,
compilation_timeout,
preparation_timer,
)
.boxed(),
Expand Down Expand Up @@ -263,9 +269,11 @@ async fn start_work_task<Timer>(
code: Arc<Vec<u8>>,
cache_path: PathBuf,
artifact_path: PathBuf,
compilation_timeout: Duration,
_preparation_timer: Option<Timer>,
) -> PoolEvent {
let outcome = worker::start_work(idle, code, &cache_path, artifact_path).await;
let outcome =
worker::start_work(idle, code, &cache_path, artifact_path, compilation_timeout).await;
PoolEvent::StartWork(worker, outcome)
}

Expand Down
Loading

0 comments on commit 851a108

Please sign in to comment.