From 1056acada0d1a0da5c91f6fce3d1cdfed93fdce0 Mon Sep 17 00:00:00 2001 From: Yin Guanhao Date: Wed, 20 Apr 2022 20:14:26 +0800 Subject: [PATCH 1/2] perf: optimize L1 sync with pipelining --- Cargo.lock | 2 + crates/block-producer/Cargo.toml | 2 + crates/block-producer/src/poller.rs | 353 ++++++++++++++++------------ 3 files changed, 211 insertions(+), 146 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5e64f585c..6f9531288 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1865,12 +1865,14 @@ dependencies = [ "gw-version", "gw-web3-indexer", "hex", + "itertools", "lazy_static", "log", "openssl", "opentelemetry", "opentelemetry-jaeger", "rayon", + "scopeguard", "semver", "sentry", "sentry-tracing", diff --git a/crates/block-producer/Cargo.toml b/crates/block-producer/Cargo.toml index 34e00d507..7b2316492 100644 --- a/crates/block-producer/Cargo.toml +++ b/crates/block-producer/Cargo.toml @@ -65,6 +65,8 @@ tokio-metrics = "0.1.0" console-subscriber = "0.1.3" tentacle = { version = "0.4.0-beta.4" } gw-p2p-network = { path = "../p2p-network" } +scopeguard = "1.1.0" +itertools = "0.10.3" [target.'cfg(all(not(target_env = "msvc"), not(target_os="macos")))'.dependencies] tikv-jemallocator = { version = "0.4.0", features = ["unprefixed_malloc_on_supported_platforms"] } diff --git a/crates/block-producer/src/poller.rs b/crates/block-producer/src/poller.rs index 13451856b..b066df69a 100644 --- a/crates/block-producer/src/poller.rs +++ b/crates/block-producer/src/poller.rs @@ -26,13 +26,14 @@ use gw_types::{ prelude::*, }; use gw_web3_indexer::indexer::Web3Indexer; +use itertools::Itertools; use serde_json::json; use std::{ collections::{HashMap, HashSet}, sync::Arc, - time::Duration, + time::{Duration, Instant}, }; -use tokio::sync::Mutex; +use tokio::sync::{mpsc, Mutex}; use tokio_metrics::TaskMonitor; use tracing::instrument; @@ -52,14 +53,19 @@ impl QueryL1TxError { } } -pub struct ChainUpdater { +struct ChainUpdaterContext { chain: Arc>, rpc_client: RPCClient, - last_tx_hash: Option, rollup_context: RollupContext, rollup_type_script: ckb_types::packed::Script, web3_indexer: Option, +} + +pub struct ChainUpdater { + last_tx_hash: Option, initialized: bool, + // Context shared with sub tasks. + context: Arc, sync_monitor: TaskMonitor, } @@ -83,14 +89,17 @@ impl ChainUpdater { }); ChainUpdater { - chain, - rpc_client, - rollup_context, - rollup_type_script, last_tx_hash: None, - web3_indexer, initialized: false, sync_monitor, + context: ChainUpdaterContext { + chain, + rpc_client, + rollup_context, + rollup_type_script, + web3_indexer, + } + .into(), } } @@ -106,7 +115,8 @@ impl ChainUpdater { // Check l1 fork let local_tip_committed_info = { - self.chain + self.context + .chain .lock() .await .local_state() @@ -122,7 +132,7 @@ impl ChainUpdater { committed_l1_block_number, committed_l1_block_hash, ) = { - let chain = self.chain.lock().await; + let chain = self.context.chain.lock().await; let local_tip_block = chain.local_state().tip().raw(); let local_tip_committed_info = chain.local_state().last_synced(); ( @@ -147,16 +157,25 @@ impl ChainUpdater { if initial_syncing { // Start notify mem pool after synced - self.chain.lock().await.complete_initial_syncing().await?; + self.context + .chain + .lock() + .await + .complete_initial_syncing() + .await?; } Ok(()) } #[instrument(skip_all)] - pub async fn try_sync(&mut self) -> anyhow::Result<()> { + async fn try_sync(&mut self) -> anyhow::Result<()> { + tracing::info!("try sync"); + scopeguard::defer! { + tracing::info!("try sync completed"); + } let valid_tip_l1_block_number = { - let chain = self.chain.lock().await; + let chain = self.context.chain.lock().await; let local_tip_block: u64 = chain.local_state().tip().raw().number().unpack(); let local_committed_l1_block: u64 = chain.local_state().last_synced().number().unpack(); @@ -169,7 +188,7 @@ impl ChainUpdater { local_committed_l1_block }; let search_key = SearchKey { - script: self.rollup_type_script.clone().into(), + script: self.context.rollup_type_script.clone().into(), script_type: ScriptType::Type, filter: Some(SearchKeyFilter { script: None, @@ -182,144 +201,98 @@ impl ChainUpdater { }), }; let order = Order::Asc; - let limit = Uint32::from(1000); + let limit = Uint32::from(64); + + let context = self.context.clone(); + let (tx, mut rx) = mpsc::channel(1024); + + // Get transactions in another task so that the actual syncing does not need to wait for it (pipelining). + let task_handle = tokio::spawn(async move { + // TODO: the syncing logic here works under the assumption that a single + // L1 CKB block can contain at most one L2 Godwoken block. The logic + // here needs revising, once we relax this constraint for more performance. + let mut last_cursor = None; + loop { + let txs: Pagination = context + .rpc_client + .indexer + .request( + "get_transactions", + Some(ClientParams::Array(vec![ + json!(search_key), + json!(order), + json!(limit), + json!(last_cursor), + ])), + ) + .await?; + if txs.objects.is_empty() { + break; + } + last_cursor = Some(txs.last_cursor); + + // Dedup because each transaction hash appears twice (input and output). + for tx_hash in txs.objects.into_iter().map(|tx| tx.tx_hash).dedup() { + let result = context + .get_transaction_and_sync_param(&tx_hash) + .await + .map(|(tx, sync_param)| (tx_hash, tx, sync_param)); + let is_err = result.is_err(); + if tx.send(result).await.is_err() || is_err { + break; + } + } + } + Ok::<_, anyhow::Error>(()) + }); + let guard = scopeguard::guard(task_handle, |h| h.abort()); - // TODO: the syncing logic here works under the assumption that a single - // L1 CKB block can contain at most one L2 Godwoken block. The logic - // here needs revising, once we relax this constraint for more performance. - let mut last_cursor = None; - loop { - let txs: Pagination = self - .rpc_client - .indexer - .request( - "get_transactions", - Some(ClientParams::Array(vec![ - json!(search_key), - json!(order), - json!(limit), - json!(last_cursor), - ])), - ) + let mut instant = Instant::now(); + let mut counter = 0u32; + + let context = self.context.clone(); + let mut chain = context.chain.lock().await; + while let Some(result) = rx.recv().await { + let (tx_hash, tx, sync_param) = result?; + self.update_single(&mut *chain, tx_hash, tx, sync_param) .await?; - if txs.objects.is_empty() { - break; + counter += 1; + if counter == 100 { + let now = Instant::now(); + let dur = now.duration_since(instant); + log::info!( + "try sync: synced 100 blocks in {}.{:06}s", + dur.as_secs(), + dur.subsec_micros(), + ); + instant = now; + counter = 0; } - last_cursor = Some(txs.last_cursor); - - log::debug!("[sync revert] poll transactions: {}", txs.objects.len()); - self.update(&txs.objects).await?; } - Ok(()) - } - - #[instrument(skip_all)] - pub async fn update(&mut self, txs: &[Tx]) -> anyhow::Result<()> { - for tx in txs.iter() { - self.update_single(&tx.tx_hash).await?; - self.last_tx_hash = Some(tx.tx_hash.clone()); - } + scopeguard::ScopeGuard::into_inner(guard).await??; Ok(()) } - #[instrument(skip_all)] - async fn update_single(&mut self, tx_hash: &H256) -> anyhow::Result<()> { - if let Some(last_tx_hash) = &self.last_tx_hash { - if last_tx_hash == tx_hash { - log::info!("[sync revert] known last tx hash, skip {}", tx_hash); - return Ok(()); - } + async fn update_single( + &mut self, + chain: &mut Chain, + tx_hash: H256, + tx: Transaction, + sync_param: SyncParam, + ) -> anyhow::Result<()> { + if self.last_tx_hash.as_ref() == Some(&tx_hash) { + log::info!("[sync revert] known last tx hash, skip {}", tx_hash); + return Ok(()); } - let tx: Option = self - .rpc_client - .ckb - .request( - "get_transaction", - Some(ClientParams::Array(vec![json!(tx_hash)])), - ) - .await?; - let tx_with_status = - tx.ok_or_else(|| QueryL1TxError::new(tx_hash, anyhow!("cannot locate tx")))?; - let tx = { - let tx: ckb_types::packed::Transaction = tx_with_status.transaction.inner.into(); - Transaction::new_unchecked(tx.as_bytes()) - }; - let block_hash = tx_with_status.tx_status.block_hash.ok_or_else(|| { - QueryL1TxError::new(tx_hash, anyhow!("tx is not committed on chain!")) - })?; - let header_view: Option = self - .rpc_client - .ckb - .request( - "get_header", - Some(ClientParams::Array(vec![json!(block_hash)])), - ) - .await?; - let header_view = header_view.ok_or_else(|| { - QueryL1TxError::new(tx_hash, anyhow!("cannot locate block {}", block_hash)) - })?; - let l2block_committed_info = L2BlockCommittedInfo::new_builder() - .number(header_view.inner.number.value().pack()) - .block_hash(block_hash.0.pack()) - .transaction_hash(tx_hash.pack()) - .build(); - log::debug!( - "[sync revert] receive new l2 block from {} l1 block tx hash {:?}", - l2block_committed_info.number().unpack(), - tx_hash, - ); - - let rollup_action = self.extract_rollup_action(&tx)?; - let context = match rollup_action.to_enum() { - RollupActionUnion::RollupSubmitBlock(submitted) => { - let l2block = submitted.block(); - let (requests, asset_type_scripts) = self.extract_deposit_requests(&tx).await?; - let withdrawals = self.extract_withdrawals(&tx, &l2block).await?; - - L1ActionContext::SubmitBlock { - l2block, - deposit_requests: requests, - deposit_asset_scripts: asset_type_scripts, - withdrawals, - } - } - RollupActionUnion::RollupEnterChallenge(entered) => { - let (challenge_cell, challenge_lock_args) = - self.extract_challenge_context(&tx).await?; - - L1ActionContext::Challenge { - cell: challenge_cell, - target: challenge_lock_args.target(), - witness: entered.witness(), - } - } - RollupActionUnion::RollupCancelChallenge(_) => L1ActionContext::CancelChallenge, - RollupActionUnion::RollupRevert(reverted) => { - let reverted_blocks = reverted.reverted_blocks().into_iter(); - L1ActionContext::Revert { - reverted_blocks: reverted_blocks.collect(), - } - } - }; - - let update = L1Action { - transaction: tx.clone(), - l2block_committed_info, - context, - }; - let sync_param = SyncParam { - reverts: vec![], - updates: vec![update], - }; - self.chain.lock().await.sync(sync_param).await?; + chain.sync(sync_param).await?; // TODO sync missed block - match &self.web3_indexer { + match &self.context.web3_indexer { Some(indexer) => { - let store = { self.chain.lock().await.store().to_owned() }; + let store = chain.store().to_owned(); if let Err(err) = indexer.store(&store, &tx).await { log::error!("Web3 indexer store failed: {:?}", err); } @@ -327,12 +300,14 @@ impl ChainUpdater { None => {} } + self.last_tx_hash = Some(tx_hash); + Ok(()) } #[instrument(skip_all)] async fn find_l2block_on_l1(&self, committed_info: L2BlockCommittedInfo) -> Result { - let rpc_client = &self.rpc_client; + let rpc_client = &self.context.rpc_client; let tx_hash: gw_common::H256 = From::<[u8; 32]>::from(committed_info.transaction_hash().unpack()); let tx_status = rpc_client.get_transaction_status(tx_hash).await?; @@ -347,7 +322,7 @@ impl ChainUpdater { #[instrument(skip_all)] async fn revert_to_valid_tip_on_l1(&mut self) -> Result<()> { - let db = { self.chain.lock().await.store().begin_transaction() }; + let db = { self.context.chain.lock().await.store().begin_transaction() }; let mut revert_l1_actions = Vec::new(); // First rewind to last valid tip @@ -410,7 +385,7 @@ impl ChainUpdater { reverts: revert_l1_actions, updates: vec![], }; - self.chain.lock().await.sync(param).await?; + self.context.chain.lock().await.sync(param).await?; } // Also revert last tx hash @@ -429,8 +404,97 @@ impl ChainUpdater { Ok(()) } +} + +impl ChainUpdaterContext { + async fn get_transaction_and_sync_param( + &self, + tx_hash: &H256, + ) -> anyhow::Result<(Transaction, SyncParam)> { + let tx: Option = self + .rpc_client + .ckb + .request( + "get_transaction", + Some(ClientParams::Array(vec![json!(tx_hash)])), + ) + .await?; + let tx_with_status = + tx.ok_or_else(|| QueryL1TxError::new(tx_hash, anyhow!("cannot locate tx")))?; + let tx = { + let tx: ckb_types::packed::Transaction = tx_with_status.transaction.inner.into(); + Transaction::new_unchecked(tx.as_bytes()) + }; + let block_hash = tx_with_status.tx_status.block_hash.ok_or_else(|| { + QueryL1TxError::new(tx_hash, anyhow!("tx is not committed on chain!")) + })?; + let header_view: Option = self + .rpc_client + .ckb + .request( + "get_header", + Some(ClientParams::Array(vec![json!(block_hash)])), + ) + .await?; + let header_view = header_view.ok_or_else(|| { + QueryL1TxError::new(tx_hash, anyhow!("cannot locate block {}", block_hash)) + })?; + let l2block_committed_info = L2BlockCommittedInfo::new_builder() + .number(header_view.inner.number.value().pack()) + .block_hash(block_hash.0.pack()) + .transaction_hash(tx_hash.pack()) + .build(); + log::debug!( + "[sync revert] receive new l2 block from {} l1 block tx hash {:?}", + l2block_committed_info.number().unpack(), + tx_hash, + ); + + let rollup_action = self.extract_rollup_action(&tx)?; + let context = match rollup_action.to_enum() { + RollupActionUnion::RollupSubmitBlock(submitted) => { + let l2block = submitted.block(); + let (requests, asset_type_scripts) = self.extract_deposit_requests(&tx).await?; + let withdrawals = self.extract_withdrawals(&tx, &l2block).await?; + + L1ActionContext::SubmitBlock { + l2block, + deposit_requests: requests, + deposit_asset_scripts: asset_type_scripts, + withdrawals, + } + } + RollupActionUnion::RollupEnterChallenge(entered) => { + let (challenge_cell, challenge_lock_args) = + self.extract_challenge_context(&tx).await?; + + L1ActionContext::Challenge { + cell: challenge_cell, + target: challenge_lock_args.target(), + witness: entered.witness(), + } + } + RollupActionUnion::RollupCancelChallenge(_) => L1ActionContext::CancelChallenge, + RollupActionUnion::RollupRevert(reverted) => { + let reverted_blocks = reverted.reverted_blocks().into_iter(); + L1ActionContext::Revert { + reverted_blocks: reverted_blocks.collect(), + } + } + }; + + let update = L1Action { + transaction: tx.clone(), + l2block_committed_info, + context, + }; + let sync_param = SyncParam { + reverts: vec![], + updates: vec![update], + }; + Ok((tx, sync_param)) + } - #[instrument(skip_all)] fn extract_rollup_action(&self, tx: &Transaction) -> Result { let rollup_type_hash: [u8; 32] = { let hash = self.rollup_type_script.calc_script_hash(); @@ -465,7 +529,6 @@ impl ChainUpdater { RollupAction::from_slice(&output_type).map_err(|e| anyhow!("invalid rollup action {}", e)) } - #[instrument(skip_all)] async fn extract_challenge_context( &self, tx: &Transaction, @@ -513,7 +576,6 @@ impl ChainUpdater { unreachable!("challenge output not found"); } - #[instrument(skip_all)] async fn extract_deposit_requests( &self, tx: &Transaction, @@ -594,7 +656,6 @@ impl ChainUpdater { } } -#[instrument(skip_all)] fn try_parse_deposit_request( cell_output: &CellOutput, cell_data: &Bytes, From 79d9d93313428402ca318fb234a6cf6c33ef5da2 Mon Sep 17 00:00:00 2001 From: Yin Guanhao Date: Thu, 21 Apr 2022 12:46:53 +0800 Subject: [PATCH 2/2] chore: jaeger: auto split batch to avoid message too large errors --- Cargo.lock | 28 +++++++++++++++------------- crates/block-producer/Cargo.toml | 6 +++--- crates/block-producer/src/trace.rs | 1 + 3 files changed, 19 insertions(+), 16 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6f9531288..a0141a92c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2767,9 +2767,9 @@ dependencies = [ [[package]] name = "integer-encoding" -version = "1.1.7" +version = "3.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "48dc51180a9b377fd75814d0cc02199c20f8e99433d6762f650d39cdbbd3b56f" +checksum = "0e85a1509a128c855368e135cffcde7eac17d8e1083f41e2b98c58bc1a5074be" [[package]] name = "iovec" @@ -3481,13 +3481,15 @@ dependencies = [ [[package]] name = "opentelemetry" -version = "0.16.0" +version = "0.17.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e1cf9b1c4e9a6c4de793c632496fa490bdc0e1eea73f0c91394f7b6990935d22" +checksum = "6105e89802af13fdf48c49d7646d3b533a70e536d818aae7e78ba0433d01acb8" dependencies = [ "async-trait", "crossbeam-channel", - "futures", + "futures-channel", + "futures-executor", + "futures-util", "js-sys", "lazy_static", "percent-encoding", @@ -3500,9 +3502,9 @@ dependencies = [ [[package]] name = "opentelemetry-jaeger" -version = "0.15.0" +version = "0.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "db22f492873ea037bc267b35a0e8e4fb846340058cb7c864efe3d0bf23684593" +checksum = "f8c0b12cd9e3f9b35b52f6e0dac66866c519b26f424f4bbf96e3fe8bfbdc5229" dependencies = [ "async-trait", "lazy_static", @@ -3515,9 +3517,9 @@ dependencies = [ [[package]] name = "opentelemetry-semantic-conventions" -version = "0.8.0" +version = "0.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ffeac823339e8b0f27b961f4385057bf9f97f2863bc745bd015fd6091f2270e9" +checksum = "985cc35d832d412224b2cffe2f9194b1b89b6aa5d0bef76d080dce09d90e62bd" dependencies = [ "opentelemetry", ] @@ -5150,9 +5152,9 @@ dependencies = [ [[package]] name = "thrift" -version = "0.13.0" +version = "0.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0c6d965454947cc7266d22716ebfd07b18d84ebaf35eec558586bbb2a8cb6b5b" +checksum = "b82ca8f46f95b3ce96081fe3dd89160fdea970c254bb72925255d1b62aae692e" dependencies = [ "byteorder", "integer-encoding", @@ -5502,9 +5504,9 @@ dependencies = [ [[package]] name = "tracing-opentelemetry" -version = "0.16.0" +version = "0.17.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3ffbf13a0f8b054a4e59df3a173b818e9c6177c02789871f2073977fd0062076" +checksum = "1f9378e96a9361190ae297e7f3a8ff644aacd2897f244b1ff81f381669196fa6" dependencies = [ "opentelemetry", "tracing", diff --git a/crates/block-producer/Cargo.toml b/crates/block-producer/Cargo.toml index 7b2316492..f241e5fc7 100644 --- a/crates/block-producer/Cargo.toml +++ b/crates/block-producer/Cargo.toml @@ -57,9 +57,9 @@ sentry = { git = "https://github.com/getsentry/sentry-rust", rev = "df694a49595d sentry-tracing = { git = "https://github.com/getsentry/sentry-rust", rev = "df694a49595d6890c510d80b85cfbb4b5ae6159a" } tracing = { version = "0.1", features = ["attributes"] } tracing-subscriber = { version = "0.3", features = ["env-filter", "parking_lot", "smallvec", "tracing-log"] } -tracing-opentelemetry = "0.16" -opentelemetry-jaeger = { version = "0.15", features = ["rt-tokio"] } -opentelemetry = { version = "0.16", features = ["rt-tokio"] } +tracing-opentelemetry = "0.17" +opentelemetry-jaeger = { version = "0.16", features = ["rt-tokio"] } +opentelemetry = { version = "0.17", features = ["rt-tokio"] } # unstable features tokio-metrics = "0.1.0" console-subscriber = "0.1.3" diff --git a/crates/block-producer/src/trace.rs b/crates/block-producer/src/trace.rs index 1eef1904d..29367d429 100644 --- a/crates/block-producer/src/trace.rs +++ b/crates/block-producer/src/trace.rs @@ -38,6 +38,7 @@ pub fn init(trace: Option) -> Result { let jaeger_layer = { let tracer = opentelemetry_jaeger::new_pipeline() .with_service_name("godwoken") + .with_auto_split_batch(true) .install_batch(opentelemetry::runtime::Tokio)?; tracing_opentelemetry::layer().with_tracer(tracer) };