Skip to content

Commit

Permalink
Preserve artifact cache unless stale (#1918)
Browse files Browse the repository at this point in the history
Co-authored-by: Marcin S <[email protected]>
  • Loading branch information
eagr and mrcnski authored Nov 19, 2023
1 parent 794ee98 commit b585893
Show file tree
Hide file tree
Showing 22 changed files with 535 additions and 244 deletions.
8 changes: 5 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions polkadot/node/core/candidate-validation/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

use polkadot_node_core_pvf::{
InternalValidationError, InvalidCandidate as WasmInvalidCandidate, PrepareError,
PrepareJobKind, PrepareStats, PvfPrepData, ValidationError, ValidationHost,
PrepareJobKind, PvfPrepData, ValidationError, ValidationHost,
};
use polkadot_node_primitives::{
BlockData, InvalidCandidate, PoV, ValidationResult, POV_BOMB_LIMIT, VALIDATION_CODE_BOMB_LIMIT,
Expand Down Expand Up @@ -794,7 +794,7 @@ trait ValidationBackend {
validation_result
}

async fn precheck_pvf(&mut self, pvf: PvfPrepData) -> Result<PrepareStats, PrepareError>;
async fn precheck_pvf(&mut self, pvf: PvfPrepData) -> Result<(), PrepareError>;
}

#[async_trait]
Expand Down Expand Up @@ -824,7 +824,7 @@ impl ValidationBackend for ValidationHost {
})?
}

async fn precheck_pvf(&mut self, pvf: PvfPrepData) -> Result<PrepareStats, PrepareError> {
async fn precheck_pvf(&mut self, pvf: PvfPrepData) -> Result<(), PrepareError> {
let (tx, rx) = oneshot::channel();
if let Err(err) = self.precheck_pvf(pvf, tx).await {
// Return an IO error if there was an error communicating with the host.
Expand Down
12 changes: 6 additions & 6 deletions polkadot/node/core/candidate-validation/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,7 @@ impl ValidationBackend for MockValidateCandidateBackend {
result
}

async fn precheck_pvf(&mut self, _pvf: PvfPrepData) -> Result<PrepareStats, PrepareError> {
async fn precheck_pvf(&mut self, _pvf: PvfPrepData) -> Result<(), PrepareError> {
unreachable!()
}
}
Expand Down Expand Up @@ -1014,11 +1014,11 @@ fn pov_decompression_failure_is_invalid() {
}

struct MockPreCheckBackend {
result: Result<PrepareStats, PrepareError>,
result: Result<(), PrepareError>,
}

impl MockPreCheckBackend {
fn with_hardcoded_result(result: Result<PrepareStats, PrepareError>) -> Self {
fn with_hardcoded_result(result: Result<(), PrepareError>) -> Self {
Self { result }
}
}
Expand All @@ -1034,7 +1034,7 @@ impl ValidationBackend for MockPreCheckBackend {
unreachable!()
}

async fn precheck_pvf(&mut self, _pvf: PvfPrepData) -> Result<PrepareStats, PrepareError> {
async fn precheck_pvf(&mut self, _pvf: PvfPrepData) -> Result<(), PrepareError> {
self.result.clone()
}
}
Expand All @@ -1051,7 +1051,7 @@ fn precheck_works() {

let (check_fut, check_result) = precheck_pvf(
ctx.sender(),
MockPreCheckBackend::with_hardcoded_result(Ok(PrepareStats::default())),
MockPreCheckBackend::with_hardcoded_result(Ok(())),
relay_parent,
validation_code_hash,
)
Expand Down Expand Up @@ -1113,7 +1113,7 @@ fn precheck_invalid_pvf_blob_compression() {

let (check_fut, check_result) = precheck_pvf(
ctx.sender(),
MockPreCheckBackend::with_hardcoded_result(Ok(PrepareStats::default())),
MockPreCheckBackend::with_hardcoded_result(Ok(())),
relay_parent,
validation_code_hash,
)
Expand Down
1 change: 1 addition & 0 deletions polkadot/node/core/pvf/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ license.workspace = true

[dependencies]
always-assert = "0.1"
blake3 = "1.5"
cfg-if = "1.0"
futures = "0.3.21"
futures-timer = "3.0.2"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ impl TestHost {
&self,
code: &[u8],
executor_params: ExecutorParams,
) -> Result<PrepareStats, PrepareError> {
) -> Result<(), PrepareError> {
let (result_tx, result_rx) = futures::channel::oneshot::channel();

let code = sp_maybe_compressed_blob::decompress(code, 16 * 1024 * 1024)
Expand Down
3 changes: 3 additions & 0 deletions polkadot/node/core/pvf/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ seccompiler = "0.4.0"
assert_matches = "1.4.0"
tempfile = "3.3.0"

[build-dependencies]
substrate-build-script-utils = { path = "../../../../../substrate/utils/build-script-utils" }

[features]
# This feature is used to export test code to other crates without putting it in the production build.
test-utils = []
Expand Down
19 changes: 19 additions & 0 deletions polkadot/node/core/pvf/common/build.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
// Copyright (C) Parity Technologies (UK) Ltd.
// This file is part of Polkadot.

// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.

fn main() {
substrate_build_script_utils::generate_wasmtime_version();
}
18 changes: 13 additions & 5 deletions polkadot/node/core/pvf/common/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,24 @@
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.

use crate::prepare::PrepareStats;
use crate::prepare::{PrepareSuccess, PrepareWorkerSuccess};
use parity_scale_codec::{Decode, Encode};
use std::fmt;

/// Result of PVF preparation performed by the validation host. Contains stats about the preparation
/// if successful
pub type PrepareResult = Result<PrepareStats, PrepareError>;
/// Result of PVF preparation from a worker, with checksum of the compiled PVF and stats of the
/// preparation if successful.
pub type PrepareWorkerResult = Result<PrepareWorkerSuccess, PrepareError>;

/// Result of PVF preparation propagated all the way back to the host, with path to the concluded
/// artifact and stats of the preparation if successful.
pub type PrepareResult = Result<PrepareSuccess, PrepareError>;

/// Result of prechecking PVF performed by the validation host. Contains stats about the preparation
/// if successful.
pub type PrecheckResult = Result<(), PrepareError>;

/// An error that occurred during the prepare part of the PVF pipeline.
// Codec indexes are intended to stabilize pre-encoded payloads (see `OOM_PAYLOAD` below)
// Codec indexes are intended to stabilize pre-encoded payloads (see `OOM_PAYLOAD`)
#[derive(Debug, Clone, Encode, Decode)]
pub enum PrepareError {
/// During the prevalidation stage of preparation an issue was found with the PVF.
Expand Down
2 changes: 2 additions & 0 deletions polkadot/node/core/pvf/common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ pub use sp_tracing;

const LOG_TARGET: &str = "parachain::pvf-common";

pub const RUNTIME_VERSION: &str = env!("SUBSTRATE_WASMTIME_VERSION");

use std::{
io::{self, Read, Write},
mem,
Expand Down
19 changes: 19 additions & 0 deletions polkadot/node/core/pvf/common/src/prepare.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,25 @@
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.

use parity_scale_codec::{Decode, Encode};
use std::path::PathBuf;

/// Result from prepare worker if successful.
#[derive(Debug, Clone, Default, Encode, Decode)]
pub struct PrepareWorkerSuccess {
/// Checksum of the compiled PVF.
pub checksum: String,
/// Stats of the current preparation run.
pub stats: PrepareStats,
}

/// Result of PVF preparation if successful.
#[derive(Debug, Clone, Default)]
pub struct PrepareSuccess {
/// Canonical path to the compiled artifact.
pub path: PathBuf,
/// Stats of the current preparation run.
pub stats: PrepareStats,
}

/// Preparation statistics, including the CPU time and memory taken.
#[derive(Debug, Clone, Default, Encode, Decode)]
Expand Down
2 changes: 1 addition & 1 deletion polkadot/node/core/pvf/common/src/pvf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ impl fmt::Debug for PvfPrepData {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"Pvf {{ code, code_hash: {:?}, executor_params: {:?}, prep_timeout: {:?} }}",
"Pvf {{ code: [...], code_hash: {:?}, executor_params: {:?}, prep_timeout: {:?} }}",
self.code_hash, self.executor_params, self.prep_timeout
)
}
Expand Down
1 change: 1 addition & 0 deletions polkadot/node/core/pvf/prepare-worker/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ edition.workspace = true
license.workspace = true

[dependencies]
blake3 = "1.5"
cfg-if = "1.0"
gum = { package = "tracing-gum", path = "../../../gum" }
libc = "0.2.139"
Expand Down
37 changes: 19 additions & 18 deletions polkadot/node/core/pvf/prepare-worker/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,10 @@ use nix::{
use os_pipe::{self, PipeReader, PipeWriter};
use parity_scale_codec::{Decode, Encode};
use polkadot_node_core_pvf_common::{
error::{PrepareError, PrepareResult},
error::{PrepareError, PrepareWorkerResult},
executor_intf::create_runtime_from_artifact_bytes,
framed_recv_blocking, framed_send_blocking,
prepare::{MemoryStats, PrepareJobKind, PrepareStats},
prepare::{MemoryStats, PrepareJobKind, PrepareStats, PrepareWorkerSuccess},
pvf::PvfPrepData,
worker::{
cpu_time_monitor_loop, run_worker, stringify_panic_payload,
Expand Down Expand Up @@ -106,7 +106,7 @@ fn recv_request(stream: &mut UnixStream) -> io::Result<PvfPrepData> {
}

/// Send a worker response.
fn send_response(stream: &mut UnixStream, result: PrepareResult) -> io::Result<()> {
fn send_response(stream: &mut UnixStream, result: PrepareWorkerResult) -> io::Result<()> {
framed_send_blocking(stream, &result.encode())
}

Expand Down Expand Up @@ -186,8 +186,8 @@ fn end_memory_tracking() -> isize {
///
/// 7. If compilation succeeded, write the compiled artifact into a temporary file.
///
/// 8. Send the result of preparation back to the host. If any error occurred in the above steps, we
/// send that in the `PrepareResult`.
/// 8. Send the result of preparation back to the host, including the checksum of the artifact. If
/// any error occurred in the above steps, we send that in the `PrepareWorkerResult`.
pub fn worker_entrypoint(
socket_path: PathBuf,
worker_dir_path: PathBuf,
Expand Down Expand Up @@ -439,11 +439,11 @@ fn handle_child_process(
Err(err) => Err(err),
Ok(ok) => {
cfg_if::cfg_if! {
if #[cfg(target_os = "linux")] {
let (artifact, max_rss) = ok;
} else {
let artifact = ok;
}
if #[cfg(target_os = "linux")] {
let (artifact, max_rss) = ok;
} else {
let artifact = ok;
}
}

// Stop the memory stats worker and get its observed memory stats.
Expand Down Expand Up @@ -511,7 +511,7 @@ fn handle_parent_process(
worker_pid: u32,
usage_before: Usage,
timeout: Duration,
) -> Result<PrepareStats, PrepareError> {
) -> Result<PrepareWorkerSuccess, PrepareError> {
// Read from the child. Don't decode unless the process exited normally, which we check later.
let mut received_data = Vec::new();
pipe_read
Expand Down Expand Up @@ -554,7 +554,7 @@ fn handle_parent_process(

match result {
Err(err) => Err(err),
Ok(response) => {
Ok(JobResponse { artifact, memory_stats }) => {
// The exit status should have been zero if no error occurred.
if exit_status != 0 {
return Err(PrepareError::JobError(format!(
Expand All @@ -577,13 +577,14 @@ fn handle_parent_process(
temp_artifact_dest.display(),
);
// Write to the temp file created by the host.
if let Err(err) = fs::write(&temp_artifact_dest, &response.artifact) {
if let Err(err) = fs::write(&temp_artifact_dest, &artifact) {
return Err(PrepareError::IoErr(err.to_string()))
};

Ok(PrepareStats {
memory_stats: response.memory_stats,
cpu_time_elapsed: cpu_tv,
let checksum = blake3::hash(&artifact.as_ref()).to_hex().to_string();
Ok(PrepareWorkerSuccess {
checksum,
stats: PrepareStats { memory_stats, cpu_time_elapsed: cpu_tv },
})
},
}
Expand Down Expand Up @@ -657,13 +658,13 @@ fn error_from_errno(context: &'static str, errno: Errno) -> PrepareError {

type JobResult = Result<JobResponse, PrepareError>;

/// Pre-encoded length-prefixed `Result::Err(PrepareError::OutOfMemory)`
/// Pre-encoded length-prefixed `JobResult::Err(PrepareError::OutOfMemory)`
const OOM_PAYLOAD: &[u8] = b"\x02\x00\x00\x00\x00\x00\x00\x00\x01\x08";

#[test]
fn pre_encoded_payloads() {
// NOTE: This must match the type of `response` in `send_child_response`.
let oom_unencoded: JobResult = Result::Err(PrepareError::OutOfMemory);
let oom_unencoded: JobResult = JobResult::Err(PrepareError::OutOfMemory);
let oom_encoded = oom_unencoded.encode();
// The payload is prefixed with its length in `framed_send`.
let mut oom_payload = oom_encoded.len().to_le_bytes().to_vec();
Expand Down
Loading

0 comments on commit b585893

Please sign in to comment.