diff --git a/node/core/pvf/src/host.rs b/node/core/pvf/src/host.rs index 464f8d322648..6670ea48d4ec 100644 --- a/node/core/pvf/src/host.rs +++ b/node/core/pvf/src/host.rs @@ -38,6 +38,16 @@ use std::{ time::{Duration, SystemTime}, }; +/// The time period after which the precheck preparation worker is considered unresponsive and will +/// be killed. +// NOTE: If you change this make sure to fix the buckets of `pvf_preparation_time` metric. +pub const PRECHECK_COMPILATION_TIMEOUT: Duration = Duration::from_secs(60); + +/// The time period after which the execute preparation worker is considered unresponsive and will +/// be killed. +// NOTE: If you change this make sure to fix the buckets of `pvf_preparation_time` metric. +pub const EXECUTE_COMPILATION_TIMEOUT: Duration = Duration::from_secs(180); + /// An alias to not spell the type for the oneshot sender for the PVF execution result. pub(crate) type ResultSender = oneshot::Sender>; @@ -51,10 +61,11 @@ pub struct ValidationHost { } impl ValidationHost { - /// Precheck PVF with the given code, i.e. verify that it compiles within a reasonable time limit. - /// The result of execution will be sent to the provided result sender. + /// Precheck PVF with the given code, i.e. verify that it compiles within a reasonable time + /// limit. This will prepare the PVF. The result of preparation will be sent to the provided + /// result sender. /// - /// This is async to accommodate the fact a possibility of back-pressure. In the vast majority of + /// This is async to accommodate the possibility of back-pressure. In the vast majority of /// situations this function should return immediately. /// /// Returns an error if the request cannot be sent to the validation host, i.e. if it shut down. @@ -72,7 +83,7 @@ impl ValidationHost { /// Execute PVF with the given code, execution timeout, parameters and priority. /// The result of execution will be sent to the provided result sender. /// - /// This is async to accommodate the fact a possibility of back-pressure. In the vast majority of + /// This is async to accommodate the possibility of back-pressure. In the vast majority of /// situations this function should return immediately. /// /// Returns an error if the request cannot be sent to the validation host, i.e. if it shut down. @@ -92,7 +103,7 @@ impl ValidationHost { /// Sends a signal to the validation host requesting to prepare a list of the given PVFs. /// - /// This is async to accommodate the fact a possibility of back-pressure. In the vast majority of + /// This is async to accommodate the possibility of back-pressure. In the vast majority of /// situations this function should return immediately. /// /// Returns an error if the request cannot be sent to the validation host, i.e. if it shut down. @@ -418,6 +429,9 @@ async fn handle_to_host( Ok(()) } +/// Handles PVF prechecking. +/// +/// This tries to prepare the PVF by compiling the WASM blob within a given timeout ([`PRECHECK_COMPILATION_TIMEOUT`]). async fn handle_precheck_pvf( artifacts: &mut Artifacts, prepare_queue: &mut mpsc::Sender, @@ -440,12 +454,24 @@ async fn handle_precheck_pvf( } } else { artifacts.insert_preparing(artifact_id, vec![result_sender]); - send_prepare(prepare_queue, prepare::ToQueue::Enqueue { priority: Priority::Normal, pvf }) - .await?; + send_prepare( + prepare_queue, + prepare::ToQueue::Enqueue { + priority: Priority::Normal, + pvf, + compilation_timeout: PRECHECK_COMPILATION_TIMEOUT, + }, + ) + .await?; } Ok(()) } +/// Handles PVF execution. +/// +/// This will first try to prepare the PVF, if a prepared artifact does not already exist. If there is already a +/// preparation job, we coalesce the two preparation jobs. When preparing for execution, we use a more lenient timeout +/// ([`EXECUTE_COMPILATION_TIMEOUT`]) than when prechecking. async fn handle_execute_pvf( cache_path: &Path, artifacts: &mut Artifacts, @@ -462,7 +488,7 @@ async fn handle_execute_pvf( if let Some(state) = artifacts.artifact_state_mut(&artifact_id) { match state { - ArtifactState::Prepared { ref mut last_time_needed } => { + ArtifactState::Prepared { last_time_needed } => { *last_time_needed = SystemTime::now(); send_execute( @@ -485,9 +511,17 @@ async fn handle_execute_pvf( } } else { // Artifact is unknown: register it and enqueue a job with the corresponding priority and - // + // PVF. artifacts.insert_preparing(artifact_id.clone(), Vec::new()); - send_prepare(prepare_queue, prepare::ToQueue::Enqueue { priority, pvf }).await?; + send_prepare( + prepare_queue, + prepare::ToQueue::Enqueue { + priority, + pvf, + compilation_timeout: EXECUTE_COMPILATION_TIMEOUT, + }, + ) + .await?; awaiting_prepare.add(artifact_id, execution_timeout, params, result_tx); } @@ -520,7 +554,11 @@ async fn handle_heads_up( send_prepare( prepare_queue, - prepare::ToQueue::Enqueue { priority: Priority::Normal, pvf: active_pvf }, + prepare::ToQueue::Enqueue { + priority: Priority::Normal, + pvf: active_pvf, + compilation_timeout: EXECUTE_COMPILATION_TIMEOUT, + }, ) .await?; } diff --git a/node/core/pvf/src/metrics.rs b/node/core/pvf/src/metrics.rs index df0c619989f2..547ee65f3e9d 100644 --- a/node/core/pvf/src/metrics.rs +++ b/node/core/pvf/src/metrics.rs @@ -155,7 +155,8 @@ impl metrics::Metrics for Metrics { "Time spent in preparing PVF artifacts in seconds", ) .buckets(vec![ - // This is synchronized with COMPILATION_TIMEOUT=60s constant found in + // This is synchronized with the PRECHECK_COMPILATION_TIMEOUT=60s + // and EXECUTE_COMPILATION_TIMEOUT=180s constants found in // src/prepare/worker.rs 0.1, 0.5, @@ -166,6 +167,7 @@ impl metrics::Metrics for Metrics { 20.0, 30.0, 60.0, + 180.0, ]), )?, registry, diff --git a/node/core/pvf/src/prepare/pool.rs b/node/core/pvf/src/prepare/pool.rs index 4902c4c7e3b3..fad6ed167614 100644 --- a/node/core/pvf/src/prepare/pool.rs +++ b/node/core/pvf/src/prepare/pool.rs @@ -61,7 +61,12 @@ pub enum ToPool { /// /// In either case, the worker is considered busy and no further `StartWork` messages should be /// sent until either `Concluded` or `Rip` message is received. - StartWork { worker: Worker, code: Arc>, artifact_path: PathBuf }, + StartWork { + worker: Worker, + code: Arc>, + artifact_path: PathBuf, + compilation_timeout: Duration, + }, } /// A message sent from pool to its client. @@ -205,7 +210,7 @@ fn handle_to_pool( metrics.prepare_worker().on_begin_spawn(); mux.push(spawn_worker_task(program_path.to_owned(), spawn_timeout).boxed()); }, - ToPool::StartWork { worker, code, artifact_path } => { + ToPool::StartWork { worker, code, artifact_path, compilation_timeout } => { if let Some(data) = spawned.get_mut(worker) { if let Some(idle) = data.idle.take() { let preparation_timer = metrics.time_preparation(); @@ -216,6 +221,7 @@ fn handle_to_pool( code, cache_path.to_owned(), artifact_path, + compilation_timeout, preparation_timer, ) .boxed(), @@ -263,9 +269,11 @@ async fn start_work_task( code: Arc>, cache_path: PathBuf, artifact_path: PathBuf, + compilation_timeout: Duration, _preparation_timer: Option, ) -> PoolEvent { - let outcome = worker::start_work(idle, code, &cache_path, artifact_path).await; + let outcome = + worker::start_work(idle, code, &cache_path, artifact_path, compilation_timeout).await; PoolEvent::StartWork(worker, outcome) } diff --git a/node/core/pvf/src/prepare/queue.rs b/node/core/pvf/src/prepare/queue.rs index 5aa1402916d6..a77b88e00345 100644 --- a/node/core/pvf/src/prepare/queue.rs +++ b/node/core/pvf/src/prepare/queue.rs @@ -21,7 +21,10 @@ use crate::{artifacts::ArtifactId, metrics::Metrics, PrepareResult, Priority, Pv use always_assert::{always, never}; use async_std::path::PathBuf; use futures::{channel::mpsc, stream::StreamExt as _, Future, SinkExt}; -use std::collections::{HashMap, VecDeque}; +use std::{ + collections::{HashMap, VecDeque}, + time::Duration, +}; /// A request to pool. #[derive(Debug)] @@ -30,7 +33,7 @@ pub enum ToQueue { /// /// Note that it is incorrect to enqueue the same PVF again without first receiving the /// [`FromQueue`] response. - Enqueue { priority: Priority, pvf: Pvf }, + Enqueue { priority: Priority, pvf: Pvf, compilation_timeout: Duration }, } /// A response from queue. @@ -76,6 +79,8 @@ struct JobData { /// The priority of this job. Can be bumped. priority: Priority, pvf: Pvf, + /// The timeout for the preparation job. + compilation_timeout: Duration, worker: Option, } @@ -91,7 +96,7 @@ impl WorkerData { } /// A queue structured like this is prone to starving, however, we don't care that much since we expect -/// there is going to be a limited number of critical jobs and we don't really care if background starve. +/// there is going to be a limited number of critical jobs and we don't really care if background starve. #[derive(Default)] struct Unscheduled { normal: VecDeque, @@ -203,18 +208,24 @@ impl Queue { async fn handle_to_queue(queue: &mut Queue, to_queue: ToQueue) -> Result<(), Fatal> { match to_queue { - ToQueue::Enqueue { priority, pvf } => { - handle_enqueue(queue, priority, pvf).await?; + ToQueue::Enqueue { priority, pvf, compilation_timeout } => { + handle_enqueue(queue, priority, pvf, compilation_timeout).await?; }, } Ok(()) } -async fn handle_enqueue(queue: &mut Queue, priority: Priority, pvf: Pvf) -> Result<(), Fatal> { +async fn handle_enqueue( + queue: &mut Queue, + priority: Priority, + pvf: Pvf, + compilation_timeout: Duration, +) -> Result<(), Fatal> { gum::debug!( target: LOG_TARGET, validation_code_hash = ?pvf.code_hash, ?priority, + ?compilation_timeout, "PVF is enqueued for preparation.", ); queue.metrics.prepare_enqueued(); @@ -225,7 +236,7 @@ async fn handle_enqueue(queue: &mut Queue, priority: Priority, pvf: Pvf) -> Resu "second Enqueue sent for a known artifact" ) { // This function is called in response to a `Enqueue` message; - // Precondtion for `Enqueue` is that it is sent only once for a PVF; + // Precondition for `Enqueue` is that it is sent only once for a PVF; // Thus this should always be `false`; // qed. gum::warn!( @@ -236,7 +247,7 @@ async fn handle_enqueue(queue: &mut Queue, priority: Priority, pvf: Pvf) -> Resu return Ok(()) } - let job = queue.jobs.insert(JobData { priority, pvf, worker: None }); + let job = queue.jobs.insert(JobData { priority, pvf, compilation_timeout, worker: None }); queue.artifact_id_to_job.insert(artifact_id, job); if let Some(available) = find_idle_worker(queue) { @@ -424,7 +435,12 @@ async fn assign(queue: &mut Queue, worker: Worker, job: Job) -> Result<(), Fatal send_pool( &mut queue.to_pool_tx, - pool::ToPool::StartWork { worker, code: job_data.pvf.code.clone(), artifact_path }, + pool::ToPool::StartWork { + worker, + code: job_data.pvf.code.clone(), + artifact_path, + compilation_timeout: job_data.compilation_timeout, + }, ) .await?; @@ -478,7 +494,7 @@ pub fn start( #[cfg(test)] mod tests { use super::*; - use crate::error::PrepareError; + use crate::{error::PrepareError, host::PRECHECK_COMPILATION_TIMEOUT}; use assert_matches::assert_matches; use futures::{future::BoxFuture, FutureExt}; use slotmap::SlotMap; @@ -571,7 +587,6 @@ mod tests { async fn poll_ensure_to_pool_is_empty(&mut self) { use futures_timer::Delay; - use std::time::Duration; let to_pool_rx = &mut self.to_pool_rx; run_until( @@ -594,7 +609,11 @@ mod tests { async fn properly_concludes() { let mut test = Test::new(2, 2); - test.send_queue(ToQueue::Enqueue { priority: Priority::Normal, pvf: pvf(1) }); + test.send_queue(ToQueue::Enqueue { + priority: Priority::Normal, + pvf: pvf(1), + compilation_timeout: PRECHECK_COMPILATION_TIMEOUT, + }); assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn); let w = test.workers.insert(()); @@ -607,10 +626,12 @@ mod tests { #[async_std::test] async fn dont_spawn_over_soft_limit_unless_critical() { let mut test = Test::new(2, 3); + let compilation_timeout = PRECHECK_COMPILATION_TIMEOUT; - test.send_queue(ToQueue::Enqueue { priority: Priority::Normal, pvf: pvf(1) }); - test.send_queue(ToQueue::Enqueue { priority: Priority::Normal, pvf: pvf(2) }); - test.send_queue(ToQueue::Enqueue { priority: Priority::Normal, pvf: pvf(3) }); + let priority = Priority::Normal; + test.send_queue(ToQueue::Enqueue { priority, pvf: pvf(1), compilation_timeout }); + test.send_queue(ToQueue::Enqueue { priority, pvf: pvf(2), compilation_timeout }); + test.send_queue(ToQueue::Enqueue { priority, pvf: pvf(3), compilation_timeout }); // Receive only two spawns. assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn); @@ -631,7 +652,11 @@ mod tests { assert_matches!(test.poll_and_recv_to_pool().await, pool::ToPool::StartWork { .. }); // Enqueue a critical job. - test.send_queue(ToQueue::Enqueue { priority: Priority::Critical, pvf: pvf(4) }); + test.send_queue(ToQueue::Enqueue { + priority: Priority::Critical, + pvf: pvf(4), + compilation_timeout, + }); // 2 out of 2 are working, but there is a critical job incoming. That means that spawning // another worker is warranted. @@ -641,15 +666,24 @@ mod tests { #[async_std::test] async fn cull_unwanted() { let mut test = Test::new(1, 2); + let compilation_timeout = PRECHECK_COMPILATION_TIMEOUT; - test.send_queue(ToQueue::Enqueue { priority: Priority::Normal, pvf: pvf(1) }); + test.send_queue(ToQueue::Enqueue { + priority: Priority::Normal, + pvf: pvf(1), + compilation_timeout, + }); assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn); let w1 = test.workers.insert(()); test.send_from_pool(pool::FromPool::Spawned(w1)); assert_matches!(test.poll_and_recv_to_pool().await, pool::ToPool::StartWork { .. }); // Enqueue a critical job, which warrants spawning over the soft limit. - test.send_queue(ToQueue::Enqueue { priority: Priority::Critical, pvf: pvf(2) }); + test.send_queue(ToQueue::Enqueue { + priority: Priority::Critical, + pvf: pvf(2), + compilation_timeout, + }); assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn); // However, before the new worker had a chance to spawn, the first worker finishes with its @@ -667,9 +701,10 @@ mod tests { async fn worker_mass_die_out_doesnt_stall_queue() { let mut test = Test::new(2, 2); - test.send_queue(ToQueue::Enqueue { priority: Priority::Normal, pvf: pvf(1) }); - test.send_queue(ToQueue::Enqueue { priority: Priority::Normal, pvf: pvf(2) }); - test.send_queue(ToQueue::Enqueue { priority: Priority::Normal, pvf: pvf(3) }); + let (priority, compilation_timeout) = (Priority::Normal, PRECHECK_COMPILATION_TIMEOUT); + test.send_queue(ToQueue::Enqueue { priority, pvf: pvf(1), compilation_timeout }); + test.send_queue(ToQueue::Enqueue { priority, pvf: pvf(2), compilation_timeout }); + test.send_queue(ToQueue::Enqueue { priority, pvf: pvf(3), compilation_timeout }); assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn); assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn); @@ -696,7 +731,11 @@ mod tests { async fn doesnt_resurrect_ripped_worker_if_no_work() { let mut test = Test::new(2, 2); - test.send_queue(ToQueue::Enqueue { priority: Priority::Normal, pvf: pvf(1) }); + test.send_queue(ToQueue::Enqueue { + priority: Priority::Normal, + pvf: pvf(1), + compilation_timeout: PRECHECK_COMPILATION_TIMEOUT, + }); assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn); @@ -717,7 +756,11 @@ mod tests { async fn rip_for_start_work() { let mut test = Test::new(2, 2); - test.send_queue(ToQueue::Enqueue { priority: Priority::Normal, pvf: pvf(1) }); + test.send_queue(ToQueue::Enqueue { + priority: Priority::Normal, + pvf: pvf(1), + compilation_timeout: PRECHECK_COMPILATION_TIMEOUT, + }); assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn); diff --git a/node/core/pvf/src/prepare/worker.rs b/node/core/pvf/src/prepare/worker.rs index a9124b3926c5..77570b47360b 100644 --- a/node/core/pvf/src/prepare/worker.rs +++ b/node/core/pvf/src/prepare/worker.rs @@ -32,10 +32,6 @@ use parity_scale_codec::{Decode, Encode}; use sp_core::hexdisplay::HexDisplay; use std::{panic, sync::Arc, time::Duration}; -/// The time period after which the preparation worker is considered unresponsive and will be killed. -// NOTE: If you change this make sure to fix the buckets of `pvf_preparation_time` metric. -const COMPILATION_TIMEOUT: Duration = Duration::from_secs(60); - /// Spawns a new worker with the given program path that acts as the worker and the spawn timeout. /// /// The program should be able to handle ` prepare-worker ` invocation. @@ -69,6 +65,7 @@ pub async fn start_work( code: Arc>, cache_path: &Path, artifact_path: PathBuf, + compilation_timeout: Duration, ) -> Outcome { let IdleWorker { mut stream, pid } = worker; @@ -103,7 +100,7 @@ pub async fn start_work( } let selected = - match async_std::future::timeout(COMPILATION_TIMEOUT, framed_recv(&mut stream)).await { + match async_std::future::timeout(compilation_timeout, framed_recv(&mut stream)).await { Ok(Ok(response_bytes)) => { // Received bytes from worker within the time limit. // By convention we expect encoded `PrepareResult`. diff --git a/roadmap/implementers-guide/src/pvf-prechecking.md b/roadmap/implementers-guide/src/pvf-prechecking.md index 0daeaf0593c7..1dc7611c0cef 100644 --- a/roadmap/implementers-guide/src/pvf-prechecking.md +++ b/roadmap/implementers-guide/src/pvf-prechecking.md @@ -1,12 +1,34 @@ # PVF Pre-checking Overview -> ⚠️ This discusses a mechanism that is currently not under-development. Follow the progress under [#3211]. +> ⚠️ This discusses a mechanism that is currently under-development. Follow the progress under [#3211]. + +## Terms + +This functionality involves several processes which may be potentially +confusing: + +- **Prechecking:** This is the process of initially checking the PVF when it is + first added. We attempt *preparation* of the PVF and make sure it succeeds + within a given timeout. +- **Execution:** This actually executes the PVF. The node may not have the + artifact from prechecking, in which case this process also includes a + *preparation* job. The timeout for preparation here is more lenient than when + prechecking. +- **Preparation:** This is the process of preparing the WASM blob and includes + both *prevalidation* and *compilation*. As prevalidation is pretty minimal + right now, preparation mostly consists of compilation. Note that *prechecking* + just consists of preparation, whereas *execution* will also prepare the PVF if + the artifact is not already found. +- **Prevalidation:** Right now this just tries to deserialize the binary with + parity-wasm. It is a part of *preparation*. +- **Compilation:** This is the process of compiling a PVF from wasm code to + machine code. It is a part of *preparation*. ## Motivation Parachains' and parathreads' validation function is described by a wasm module that we refer to as a PVF. Since it's a wasm module the typical way of executing it is to compile it to machine code. Typically an optimizing compiler consists of algorithms that are able to optimize the resulting machine code heavily. However, while those algorithms perform quite well for a typical wasm code produced by standard toolchains (e.g. rustc/LLVM), those algorithms can be abused to consume a lot of resources. Moreover, since those algorithms are rather complex there is a lot of room for a bug that can crash the compiler. -If compilation of a Parachain Validation Function (PVF) takes too long or uses too much memory, this can leave a node in limbo as to whether a candidate of that parachain is valid or not. +If compilation of a Parachain Validation Function (PVF) takes too long or uses too much memory, this can leave a node in limbo as to whether a candidate of that parachain is valid or not. The amount of time that a PVF takes to compile is a subjective resource limit and as such PVFs may be maliciously crafted so that there is e.g. a 50/50 split of validators which can and cannot compile and execute the PVF. diff --git a/scripts/ci/gitlab/lingua.dic b/scripts/ci/gitlab/lingua.dic index 3add6a276cf0..3a19233a8fb9 100644 --- a/scripts/ci/gitlab/lingua.dic +++ b/scripts/ci/gitlab/lingua.dic @@ -204,6 +204,7 @@ PoV/MS PoW/MS PR precheck +prechecking preconfigured preimage/MS preopen diff --git a/zombienet_tests/functional/0001-parachains-pvf.zndsl b/zombienet_tests/functional/0001-parachains-pvf.zndsl index 07bc356464fe..1f187498d78f 100644 --- a/zombienet_tests/functional/0001-parachains-pvf.zndsl +++ b/zombienet_tests/functional/0001-parachains-pvf.zndsl @@ -64,14 +64,14 @@ one: reports histogram polkadot_pvf_preparation_time has at least 1 samples in b two: reports histogram polkadot_pvf_preparation_time has at least 1 samples in buckets ["0.1", "0.5", "1", "2", "3", "10"] within 10 seconds # Check all buckets >= 20. -alice: reports histogram polkadot_pvf_preparation_time has 0 samples in buckets ["20", "30", "60", "+Inf"] within 10 seconds -bob: reports histogram polkadot_pvf_preparation_time has 0 samples in buckets ["20", "30", "60", "+Inf"] within 10 seconds -charlie: reports histogram polkadot_pvf_preparation_time has 0 samples in buckets ["20", "30", "60", "+Inf"] within 10 seconds -dave: reports histogram polkadot_pvf_preparation_time has 0 samples in buckets ["20", "30", "60", "+Inf"] within 10 seconds -ferdie: reports histogram polkadot_pvf_preparation_time has 0 samples in buckets ["20", "30", "60", "+Inf"] within 10 seconds -eve: reports histogram polkadot_pvf_preparation_time has 0 samples in buckets ["20", "30", "60", "+Inf"] within 10 seconds -one: reports histogram polkadot_pvf_preparation_time has 0 samples in buckets ["20", "30", "60", "+Inf"] within 10 seconds -two: reports histogram polkadot_pvf_preparation_time has 0 samples in buckets ["20", "30", "60", "+Inf"] within 10 seconds +alice: reports histogram polkadot_pvf_preparation_time has 0 samples in buckets ["20", "30", "60", "180", "+Inf"] within 10 seconds +bob: reports histogram polkadot_pvf_preparation_time has 0 samples in buckets ["20", "30", "60", "180", "+Inf"] within 10 seconds +charlie: reports histogram polkadot_pvf_preparation_time has 0 samples in buckets ["20", "30", "60", "180", "+Inf"] within 10 seconds +dave: reports histogram polkadot_pvf_preparation_time has 0 samples in buckets ["20", "30", "60", "180", "+Inf"] within 10 seconds +ferdie: reports histogram polkadot_pvf_preparation_time has 0 samples in buckets ["20", "30", "60", "180", "+Inf"] within 10 seconds +eve: reports histogram polkadot_pvf_preparation_time has 0 samples in buckets ["20", "30", "60", "180", "+Inf"] within 10 seconds +one: reports histogram polkadot_pvf_preparation_time has 0 samples in buckets ["20", "30", "60", "180", "+Inf"] within 10 seconds +two: reports histogram polkadot_pvf_preparation_time has 0 samples in buckets ["20", "30", "60", "180", "+Inf"] within 10 seconds # Check execution time. # There are two different timeout conditions: BACKING_EXECUTION_TIMEOUT(2s) and