Skip to content

Commit

Permalink
feat(vm): Enable parallelization in VM playground (#2679)
Browse files Browse the repository at this point in the history
## What ❔

Makes batch execution parallelized in VM playground similar to other VM
runner components.

## Why ❔

VM playground is quite slow on stage (~20s / batch).

## Checklist

- [x] PR title corresponds to the body of PR (we generate changelog
entries from PRs).
- [x] Tests for the changes have been added / updated.
- [x] Documentation comments have been added / updated.
- [x] Code has been formatted via `zk fmt` and `zk lint`.
  • Loading branch information
slowli committed Aug 21, 2024
1 parent 1e25508 commit c9ad59e
Show file tree
Hide file tree
Showing 9 changed files with 176 additions and 73 deletions.
8 changes: 8 additions & 0 deletions core/lib/config/src/configs/experimental.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ pub struct ExperimentalVmPlaygroundConfig {
/// First L1 batch to consider processed. Will not be used if the processing cursor is persisted, unless the `reset` flag is set.
#[serde(default)]
pub first_processed_batch: L1BatchNumber,
/// Maximum number of L1 batches to process in parallel.
#[serde(default = "ExperimentalVmPlaygroundConfig::default_window_size")]
pub window_size: NonZeroU32,
/// If set to true, processing cursor will reset `first_processed_batch` regardless of the current progress. Beware that this will likely
/// require to drop the RocksDB cache.
#[serde(default)]
Expand All @@ -86,6 +89,7 @@ impl Default for ExperimentalVmPlaygroundConfig {
fast_vm_mode: FastVmMode::default(),
db_path: Self::default_db_path(),
first_processed_batch: L1BatchNumber(0),
window_size: Self::default_window_size(),
reset: false,
}
}
Expand All @@ -95,6 +99,10 @@ impl ExperimentalVmPlaygroundConfig {
pub fn default_db_path() -> String {
"./db/vm_playground".to_owned()
}

pub fn default_window_size() -> NonZeroU32 {
NonZeroU32::new(1).unwrap()
}
}

/// Experimental VM configuration options.
Expand Down
1 change: 1 addition & 0 deletions core/lib/config/src/testonly.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,7 @@ impl Distribution<configs::ExperimentalVmPlaygroundConfig> for EncodeDist {
fast_vm_mode: gen_fast_vm_mode(rng),
db_path: self.sample(rng),
first_processed_batch: L1BatchNumber(rng.gen()),
window_size: rng.gen(),
reset: self.sample(rng),
}
}
Expand Down
3 changes: 3 additions & 0 deletions core/lib/protobuf_config/src/experimental.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ impl ProtoRepr for proto::VmPlayground {
.clone()
.unwrap_or_else(Self::Type::default_db_path),
first_processed_batch: L1BatchNumber(self.first_processed_batch.unwrap_or(0)),
window_size: NonZeroU32::new(self.window_size.unwrap_or(1))
.context("window_size cannot be 0")?,
reset: self.reset.unwrap_or(false),
})
}
Expand All @@ -94,6 +96,7 @@ impl ProtoRepr for proto::VmPlayground {
fast_vm_mode: Some(proto::FastVmMode::new(this.fast_vm_mode).into()),
db_path: Some(this.db_path.clone()),
first_processed_batch: Some(this.first_processed_batch.0),
window_size: Some(this.window_size.get()),
reset: Some(this.reset),
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ message VmPlayground {
optional string db_path = 2; // optional; defaults to `./db/vm_playground`
optional uint32 first_processed_batch = 3; // optional; defaults to 0
optional bool reset = 4; // optional; defaults to false
optional uint32 window_size = 5; // optional; non-zero; defaults to 1
}

message Vm {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@ use zksync_config::configs::ExperimentalVmPlaygroundConfig;
use zksync_node_framework_derive::{FromContext, IntoContext};
use zksync_types::L2ChainId;
use zksync_vm_runner::{
impls::{VmPlayground, VmPlaygroundIo, VmPlaygroundLoaderTask},
impls::{VmPlayground, VmPlaygroundCursorOptions, VmPlaygroundIo, VmPlaygroundLoaderTask},
ConcurrentOutputHandlerFactoryTask,
};

use crate::{
implementations::resources::{
healthcheck::AppHealthCheckResource,
pools::{MasterPool, PoolResource},
pools::{PoolResource, ReplicaPool},
},
StopReceiver, Task, TaskId, WiringError, WiringLayer,
};
Expand All @@ -33,7 +33,8 @@ impl VmPlaygroundLayer {
#[derive(Debug, FromContext)]
#[context(crate = crate)]
pub struct Input {
pub master_pool: PoolResource<MasterPool>,
// We use a replica pool because VM playground doesn't write anything to the DB by design.
pub replica_pool: PoolResource<ReplicaPool>,
#[context(default)]
pub app_health: AppHealthCheckResource,
}
Expand All @@ -60,24 +61,30 @@ impl WiringLayer for VmPlaygroundLayer {

async fn wire(self, input: Self::Input) -> Result<Self::Output, WiringError> {
let Input {
master_pool,
replica_pool,
app_health,
} = input;

// - 1 connection for `StorageSyncTask` which can hold a long-term connection in case it needs to
// catch up cache.
// - 1 connection for `ConcurrentOutputHandlerFactoryTask` / `VmRunner` as they need occasional access
// to DB for querying last processed batch and last ready to be loaded batch.
// - 1 connection for the only running VM instance.
let connection_pool = master_pool.get_custom(3).await?;

// - `window_size` connections for running VM instances.
let connection_pool = replica_pool
.get_custom(2 + self.config.window_size.get())
.await?;

let cursor = VmPlaygroundCursorOptions {
first_processed_batch: self.config.first_processed_batch,
window_size: self.config.window_size,
reset_state: self.config.reset,
};
let (playground, tasks) = VmPlayground::new(
connection_pool,
self.config.fast_vm_mode,
self.config.db_path,
self.zksync_network_id,
self.config.first_processed_batch,
self.config.reset,
cursor,
)
.await?;

Expand Down
5 changes: 4 additions & 1 deletion core/node/vm_runner/src/impls/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ pub use self::{
bwip::{
BasicWitnessInputProducer, BasicWitnessInputProducerIo, BasicWitnessInputProducerTasks,
},
playground::{VmPlayground, VmPlaygroundIo, VmPlaygroundLoaderTask, VmPlaygroundTasks},
playground::{
VmPlayground, VmPlaygroundCursorOptions, VmPlaygroundIo, VmPlaygroundLoaderTask,
VmPlaygroundTasks,
},
protective_reads::{ProtectiveReadsIo, ProtectiveReadsWriter, ProtectiveReadsWriterTasks},
};
32 changes: 21 additions & 11 deletions core/node/vm_runner/src/impls/playground.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::{
io,
num::NonZeroU32,
path::{Path, PathBuf},
sync::Arc,
};
Expand Down Expand Up @@ -34,6 +35,17 @@ impl From<VmPlaygroundHealth> for Health {
}
}

/// Options related to the VM playground cursor.
#[derive(Debug)]
pub struct VmPlaygroundCursorOptions {
/// First batch to be processed by the playground. Only used if there are no processed batches, or if [`Self.reset_state`] is set.
pub first_processed_batch: L1BatchNumber,
/// Maximum number of L1 batches to process in parallel.
pub window_size: NonZeroU32,
/// If set, reset processing to [`Self.first_processed_batch`].
pub reset_state: bool,
}

/// Virtual machine playground. Does not persist anything in Postgres; instead, keeps an L1 batch cursor as a plain text file in the RocksDB directory
/// (so that the playground doesn't repeatedly process same batches after a restart).
#[derive(Debug)]
Expand All @@ -56,21 +68,17 @@ impl VmPlayground {
vm_mode: FastVmMode,
rocksdb_path: String,
chain_id: L2ChainId,
first_processed_batch: L1BatchNumber,
reset_state: bool,
cursor: VmPlaygroundCursorOptions,
) -> anyhow::Result<(Self, VmPlaygroundTasks)> {
tracing::info!(
"Starting VM playground with mode {vm_mode:?}, first processed batch is #{first_processed_batch} \
(reset processing: {reset_state:?})"
);
tracing::info!("Starting VM playground with mode {vm_mode:?}, cursor options: {cursor:?}");

let cursor_file_path = Path::new(&rocksdb_path).join("__vm_playground_cursor");
let latest_processed_batch = VmPlaygroundIo::read_cursor(&cursor_file_path).await?;
tracing::info!("Latest processed batch: {latest_processed_batch:?}");
let latest_processed_batch = if reset_state {
first_processed_batch
let latest_processed_batch = if cursor.reset_state {
cursor.first_processed_batch
} else {
latest_processed_batch.unwrap_or(first_processed_batch)
latest_processed_batch.unwrap_or(cursor.first_processed_batch)
};

let mut batch_executor = MainBatchExecutor::new(false, false);
Expand All @@ -79,6 +87,7 @@ impl VmPlayground {
let io = VmPlaygroundIo {
cursor_file_path,
vm_mode,
window_size: cursor.window_size.get(),
latest_processed_batch: Arc::new(watch::channel(latest_processed_batch).0),
health_updater: Arc::new(ReactiveHealthCheck::new("vm_playground").1),
};
Expand All @@ -98,7 +107,7 @@ impl VmPlayground {
io,
loader_task_sender,
output_handler_factory,
reset_to_batch: reset_state.then_some(first_processed_batch),
reset_to_batch: cursor.reset_state.then_some(cursor.first_processed_batch),
};
Ok((
this,
Expand Down Expand Up @@ -213,6 +222,7 @@ pub struct VmPlaygroundTasks {
pub struct VmPlaygroundIo {
cursor_file_path: PathBuf,
vm_mode: FastVmMode,
window_size: u32,
// We don't read this value from the cursor file in the `VmRunnerIo` implementation because reads / writes
// aren't guaranteed to be atomic.
latest_processed_batch: Arc<watch::Sender<L1BatchNumber>>,
Expand Down Expand Up @@ -285,7 +295,7 @@ impl VmRunnerIo for VmPlaygroundIo {
.await?
.context("no L1 batches in Postgres")?;
let last_processed_l1_batch = self.latest_processed_batch(conn).await?;
Ok(sealed_l1_batch.min(last_processed_l1_batch + 1))
Ok(sealed_l1_batch.min(last_processed_l1_batch + self.window_size))
}

async fn mark_l1_batch_as_processing(
Expand Down
Loading

0 comments on commit c9ad59e

Please sign in to comment.