diff --git a/bin/akula.rs b/bin/akula.rs index 192d03a5..abc15fa4 100644 --- a/bin/akula.rs +++ b/bin/akula.rs @@ -5,7 +5,7 @@ use akula::{ consensus::{engine_factory, Consensus, ForkChoiceMode, InitialParams, ParliaInitialParams}, crypto::signer::ECDSASigner, kv::tables::CHAINDATA_TABLES, - mining::state::*, + mining::{state::*, StagedMining}, models::*, p2p::node::NodeBuilder, rpc::{ @@ -253,7 +253,6 @@ fn main() -> anyhow::Result<()> { let chain_config = ChainConfig::from(chainspec.clone()); - let mining_db = db.clone(); if !opt.no_rpc { tokio::spawn({ let db = db.clone(); @@ -482,7 +481,7 @@ fn main() -> anyhow::Result<()> { InitialParams::Parlia(ParliaInitialParams { bls_prv_key: opt.bls_secret_key, bls_pub_key: opt.bls_public_key, - node: Some(Arc::clone(&node)), + node: Some(node.clone()), sync_stage: Some(staged_sync.current_stage()), }) } @@ -553,7 +552,7 @@ fn main() -> anyhow::Result<()> { if !opt.skip_commitment { staged_sync.push(HashState::new(etl_temp_dir.clone(), None), !opt.prune); staged_sync.push_with_unwind_priority( - Interhashes::new(etl_temp_dir.clone(), None), + Interhashes::new(etl_temp_dir.clone(), None, None), !opt.prune, 1, ); @@ -588,7 +587,6 @@ fn main() -> anyhow::Result<()> { ); if can_mine { - staged_sync.is_mining = true; info!("Running staged mining"); let mut consensus_config = engine_factory( Some(db.clone()), @@ -596,89 +594,66 @@ fn main() -> anyhow::Result<()> { Some(opt.engine_listen_address), params.clone(), )?; - tokio::spawn(async move { - let mut staged_mining = stagedsync::StagedSync::new(); - staged_mining.is_mining = true; - consensus_config.authorize(ECDSASigner::from_secret(&opt.mine_secretkey.unwrap()[..])); - let config = MiningConfig { - enabled: true, - ether_base: opt.mine_etherbase.unwrap().clone(), - secret_key: opt.mine_secretkey.unwrap().clone(), - extra_data: opt.mine_extradata.map(Bytes::from).clone(), - consensus: consensus_config, - dao_fork_block: Some(BigInt::new(num_bigint::Sign::Plus, vec![])), - dao_fork_support: false, - gas_limit: 30000000, - }; - let mining_config_mutex = Arc::new(Mutex::new(config)); - info!("Mining enabled"); - let mining_block = MiningBlock { - header: BlockHeader { - parent_hash: H256::zero(), - ommers_hash: H256::zero(), - beneficiary: Address::zero(), - state_root: H256::zero(), - transactions_root: H256::zero(), - receipts_root: H256::zero(), - logs_bloom: Bloom::zero(), - difficulty: U256::ZERO, - number: BlockNumber(0), - gas_limit: 0, - gas_used: 0, - timestamp: 0, - extra_data: Bytes::new(), - mix_hash: H256::zero(), - nonce: H64::zero(), - base_fee_per_gas: None, - }, - ommers: Default::default(), - transactions: vec![], - }; - let mining_block_mutex = Arc::new(Mutex::new(mining_block)); - let mining_status = MiningStatus::new(); - let mining_status_mutex = Arc::new(Mutex::new(mining_status)); - staged_mining.push( - CreateBlock { - mining_status: Arc::clone(&mining_status_mutex), - mining_block: Arc::clone(&mining_block_mutex), - mining_config: Arc::clone(&mining_config_mutex), - chain_spec: chainspec.clone(), - }, - false, - ); - + // TODO start mining async, just set sync mining now + // tokio::spawn(async move { + let mut staged_mining = StagedMining::new(); + consensus_config.authorize(ECDSASigner::from_secret(&opt.mine_secretkey.unwrap()[..])); + let mining_config = Arc::new(Mutex::new(MiningConfig { + enabled: true, + ether_base: opt.mine_etherbase.unwrap().clone(), + secret_key: opt.mine_secretkey.unwrap().clone(), + extra_data: opt.mine_extradata.map(Bytes::from).clone(), + consensus: consensus_config, + dao_fork_block: Some(BigInt::new(num_bigint::Sign::Plus, vec![])), + dao_fork_support: false, + gas_limit: 30000000, + })); + info!("Mining enabled"); + let mining_block = Arc::new(Mutex::new(MiningBlock::default())); + let mining_status = Arc::new(Mutex::new(MiningStatus::new())); staged_mining.push( - MiningExecBlock { - mining_status: Arc::clone(&mining_status_mutex), - mining_block: Arc::clone(&mining_block_mutex), - mining_config: Arc::clone(&mining_config_mutex), - chain_spec: chainspec.clone(), - }, - false, - ); + CreateBlock { + mining_status: mining_status.clone(), + mining_block: mining_block.clone(), + mining_config: mining_config.clone(), + chain_spec: chainspec.clone(), + node: node.clone(), + }, + ); - // TODO Error: Faulty cumulative index: max gas less than current gas (1395722 < 1532487), you should mining after latest block - // staged_sync.push(HashState::new(etl_temp_dir.clone(), None), !opt.prune); - // - // staged_sync.push_with_unwind_priority( - // Interhashes::new(etl_temp_dir.clone(), None), - // !opt.prune, - // 1, - // ); - info!("createBlock stage enabled"); + staged_mining.push( + MiningExecBlock { + mining_status: mining_status.clone(), + mining_block: mining_block.clone(), + mining_config: mining_config.clone(), + chain_spec: chainspec.clone(), + }, + ); - staged_mining.push( - MiningFinishBlock { - mining_status: Arc::clone(&mining_status_mutex), - mining_block: Arc::clone(&mining_block_mutex), - mining_config: Arc::clone(&mining_config_mutex), - chain_spec: chainspec.clone(), - node: node.clone(), - }, - false, - ); - staged_mining.run(&mining_db).await.unwrap() - }); + staged_mining.push( + TotalGasIndex {}, + ); + + staged_mining.push(HashState::new(etl_temp_dir.clone(), None)); + + staged_mining.push( + Interhashes::new(etl_temp_dir.clone(), None, Some(mining_block.clone())), + ); + info!("createBlock stage enabled"); + + staged_mining.push( + MiningFinishBlock { + mining_status: mining_status.clone(), + mining_block: mining_block.clone(), + mining_config: mining_config.clone(), + chain_spec: chainspec.clone(), + node: node.clone(), + }, + ); + // TODO start mining async, just set sync mining + // staged_mining.run(&mining_db).await.unwrap() + staged_sync.enable_mining(staged_mining); + // }); } info!("Running staged sync"); diff --git a/src/consensus/fork_choice_graph.rs b/src/consensus/fork_choice_graph.rs index 0f016249..9aeed501 100644 --- a/src/consensus/fork_choice_graph.rs +++ b/src/consensus/fork_choice_graph.rs @@ -58,6 +58,7 @@ impl ForkChoiceGraph { self.skip_list.clear(); self.chains.clear(); self.q.clear(); + info!("fork choice graph clear q {:?}", self.q); } #[inline] @@ -72,10 +73,12 @@ impl ForkChoiceGraph { #[inline] pub fn insert_with_hash(&mut self, hash: H256, header: BlockHeader) { + info!("try insert {}:{:?}", header.number, hash); if self.q.contains_key(&hash) { return; } + info!("inserted {}:{:?}", header.number, hash); self.skip_list .entry(header.parent_hash) .or_insert(HashSet::new()) @@ -94,12 +97,18 @@ impl ForkChoiceGraph { pub fn chain_head(&mut self) -> Option { let mut roots = HashSet::new(); + info!( + "chain_head skip_list {:?}, raw {:?}, q {:?}", + self.skip_list, self.raw, self.q + ); for (hash, _) in self.q.iter() { + info!("chain_head q.iter {:?}", hash); if !self.skip_list.contains_key(hash) && self.raw.contains_key(hash) { roots.insert(*hash); } } if roots.is_empty() { + info!("chain_head got empty roots."); return None; } @@ -129,6 +138,7 @@ impl ForkChoiceGraph { }; Some(*head_hash) } else { + info!("chain_head max_by_key none."); None } } diff --git a/src/consensus/mod.rs b/src/consensus/mod.rs index 9100a295..51513087 100644 --- a/src/consensus/mod.rs +++ b/src/consensus/mod.rs @@ -32,6 +32,7 @@ use std::{ }; use tokio::sync::watch; use tracing::*; +use crate::p2p::node::Node; #[derive(Debug)] pub enum FinalizationChange { @@ -195,8 +196,9 @@ pub trait Consensus: Debug + Send + Sync + 'static { /// return bool indicate if block is seal correctly. if false, just skip it. fn seal( &mut self, + _node: Arc, _header_reader: &dyn HeaderReader, - _block: &mut Block, + mut _block: Block, ) -> anyhow::Result { Ok(true) } diff --git a/src/consensus/parlia/mod.rs b/src/consensus/parlia/mod.rs index dd7c2cba..618a44fd 100644 --- a/src/consensus/parlia/mod.rs +++ b/src/consensus/parlia/mod.rs @@ -37,11 +37,13 @@ use milagro_bls::{AggregateSignature, PublicKey}; use parking_lot::RwLock; use std::{ collections::{BTreeMap, BTreeSet, HashMap}, - time::{Duration, SystemTime}, + time::{Duration, SystemTime, UNIX_EPOCH}, }; +use std::ops::{Add, Sub}; use tokio::sync::watch::Receiver as WatchReceiver; use tracing::*; use TransactionAction; +use rand::prelude::*; pub const EXTRA_VANITY: usize = 32; /// Fixed number of extra-data prefix bytes reserved for signer vanity @@ -84,6 +86,10 @@ const INIT_TX_NUM: usize = 7; const BACKOFF_TIME_OF_INITIAL: u64 = 1_u64; /// Random additional delay (per signer) to allow concurrent signers, second const BACKOFF_TIME_OF_WIGGLE: u64 = 1_u64; +/// Default delay (per signer) to allow concurrent signers before ramanujan fork, millisecond +const BACKOFF_MILL_TIME_OF_FIXED_BEFORE_FORK: u64 = 200_u64; +/// Random additional delay (per signer) to allow concurrent signers before ramanujan fork, millisecond +const BACKOFF_MILL_TIME_OF_WIGGLE_BEFORE_FORK: u64 = 500_u64; /// process delay (per signer) to allow concurrent signers, second const BACKOFF_TIME_OF_PROCESS: u64 = 1_u64; /// Maximum the gas limit may ever be. @@ -582,6 +588,49 @@ impl Parlia { Ok(()) } + fn block_time_for_ramanujan_fork( + &self, + snap: &Snapshot, + header: &BlockHeader, + parent: &BlockHeader, + ) -> u64 { + let mut block_timestamp = parent.timestamp + self.period; + if self.chain_spec.is_ramanujan(&header.number) { + block_timestamp += self.back_off_time(snap, &header); + } + block_timestamp + } + + fn delay_for_Ramanujan_fork( + &self, + snap: &Snapshot, + header: &BlockHeader + ) -> Duration { + let now = SystemTime::now().duration_since(UNIX_EPOCH).unwrap(); + let mut delay = Duration::from_secs(header.timestamp).checked_sub(now); + match delay { + Some(delay) => { + info!("delay_for_Ramanujan_fork now {:?}, header.timestamp {}, delay {:?}", now, header.timestamp, delay); + if self.chain_spec.is_ramanujan(&header.number) { + return delay; + } + + if header.difficulty == DIFF_NOTURN { + // It's not our turn explicitly to sign, delay it a bit + let wiggle = ((snap.validators.len() / 2 + 1) as u64) * BACKOFF_MILL_TIME_OF_WIGGLE_BEFORE_FORK; + let mut rng = RngSource::new(1); + delay.add(Duration::from_millis(BACKOFF_MILL_TIME_OF_FIXED_BEFORE_FORK + (rng.int63n(wiggle as i64)) as u64)); + } + + delay + } + None => { + Duration::from_millis(1) + } + } + + } + fn back_off_time(&self, snap: &Snapshot, header: &BlockHeader) -> u64 { let validator = &(header.beneficiary as Address); if snap.inturn(validator) { @@ -1166,11 +1215,19 @@ impl Consensus for Parlia { // Mix digest is reserved for now, set to empty header.mix_hash = H256::zero(); - // TODO Ensure the timestamp has the correct delay - // header.Time = self.blockTimeForRamanujanFork(snap, header, parent); - // if header.Time < uint64(time.Now().Unix()) { - // header.Time = uint64(time.Now().Unix()) - // } + let parent = + header_reader + .read_parent_header(header)? + .ok_or(ParliaError::UnknownHeader { + number: header.number.parent(), + hash: header.parent_hash, + })?; + // Ensure the timestamp has the correct delay + header.timestamp = self.block_time_for_ramanujan_fork(&snap, header, &parent); + let now = SystemTime::now().duration_since(UNIX_EPOCH)?.as_secs(); + if header.timestamp < now { + header.timestamp = now + } Ok(()) } @@ -1277,8 +1334,9 @@ impl Consensus for Parlia { fn seal( &mut self, + node: Arc, header_reader: &dyn HeaderReader, - block: &mut Block, + mut block: Block, ) -> anyhow::Result { let header = &block.header; let block_number = header.number; @@ -1313,10 +1371,10 @@ impl Consensus for Parlia { } // if we're amongst the recent signers, wait for the next block - for (last, val) in snap.recent_proposers { - if proposer == val { + for (last, val) in &snap.recent_proposers { + if proposer == *val { let limit = (snap.validators.len() / 2 + 1) as u64; - if block_number.0 < limit || last > block_number.0 - limit { + if block_number.0 < limit || *last > block_number.0 - limit { info!( "signed recently, must wait for others, block {:?}:{:?}", block_number, block_hash @@ -1326,11 +1384,10 @@ impl Consensus for Parlia { } } - // TODO Sweet, the protocol permits us to sign the block, wait for our time - // let delay = self.delayForRamanujanFork(snap, header); - let delay = 1_u64; + // Sweet, the protocol permits us to sign the block, wait for our time + let delay = self.delay_for_Ramanujan_fork(&snap, header); - info!("consensus seal the block {:?}:{:?}, proposer: {:?}, delay: {}, difficulty: {}, gasUsed: {}, txsRoot: {:?}, stateRoot: {:?}", + info!("consensus seal the block {:?}:{:?}, proposer: {:?}, delay: {:?}, difficulty: {}, gasUsed: {}, txsRoot: {:?}, stateRoot: {:?}", block_number, block_hash, proposer, delay, header.difficulty, header.gas_used, header.transactions_root, header.state_root ); @@ -1344,23 +1401,24 @@ impl Consensus for Parlia { tmp.extend_from_slice(&sig[..]); block.header.extra_data = tmp.freeze(); - //TODO tmp sync delay - thread::sleep(Duration::from_secs(delay)); - - // TODO add wait block process - // if self.shouldWaitForCurrentBlockProcess(header_reader, header, snap) { - // info!( - // "waiting for received in turn block to process, block {:?}:{:?}", - // block_number, block_hash - // ); - // - // //TODO tmp sync delay - // thread::sleep(Duration::from_secs(BACKOFF_TIME_OF_PROCESS)); - // info!( - // "process backoff time exhausted, start to seal block, block {:?}:{:?}", - // block_number, block_hash - // ); - // } + // Wait until sealing is terminated or delay timeout. + tokio::spawn(async move { + let header = &block.header; + info!("waiting to propagate, delay {:?}", delay); + tokio::time::sleep(delay).await; + if should_wait_current_block_process(node.clone(), header) { + info!("waiting for received in turn block to process, block {:?}:{:?}", block_number, block_hash); + tokio::time::sleep(Duration::from_secs(BACKOFF_TIME_OF_PROCESS)).await; + } + // TODO set correct TD + let td = node.status.read().td + header.difficulty; + // Broadcast the mined block to other p2p nodes. + let sent_request_id = rand::thread_rng().gen(); + // TODO add mined block into stageSync + info!("finally, we could send_new_mining_block to others, block: {:?}:{:?}, {:?}", block_number, block_hash, block); + node.send_new_mining_block(sent_request_id, block, td) + .await; + }); Ok(true) } @@ -1740,3 +1798,16 @@ fn calculate_difficulty(snap: &Snapshot, signer: &Address) -> U256 { } DIFF_NOTURN } + +fn should_wait_current_block_process(node: Arc, header: &BlockHeader) -> bool { + if header.difficulty == DIFF_INTURN { + return false; + } + + let status = node.status.read(); + if status.parent_hash == header.parent_hash { + return true; + } + + false +} diff --git a/src/consensus/parlia/util.rs b/src/consensus/parlia/util.rs index 95e6b0ae..af978844 100644 --- a/src/consensus/parlia/util.rs +++ b/src/consensus/parlia/util.rs @@ -519,13 +519,22 @@ mod tests { ); let chain_id = ChainId(56_u64); - let signer = ECDSASigner::from_secret(hex::decode("47ebe68bdae5faa801fd6c6d4e43d9f142cefd8fa46bb822dd32c94fbc5eaa7c").unwrap().as_slice()); + let signer = ECDSASigner::from_secret( + hex::decode("47ebe68bdae5faa801fd6c6d4e43d9f142cefd8fa46bb822dd32c94fbc5eaa7c") + .unwrap() + .as_slice(), + ); let sig = signer.sign_block(&header, chain_id).unwrap(); let mut extra = BytesMut::with_capacity(header.extra_data.len()); extra.extend_from_slice(&header.extra_data[..header.extra_data.len() - EXTRA_SEAL_LEN]); extra.extend_from_slice(&sig[..]); header.extra_data = extra.freeze(); - info!("sealed header {}:{}, header: {:?}", header.number.0, header.hash(), header); + info!( + "sealed header {}:{}, header: {:?}", + header.number.0, + header.hash(), + header + ); let addr = recover_creator(&header, chain_id).unwrap(); assert_eq!( @@ -570,6 +579,7 @@ mod tests { balances: Default::default(), p2p: P2PParams { bootnodes: vec![], + static_peers: vec![], dns: None, }, }; diff --git a/src/mining/mod.rs b/src/mining/mod.rs index b495f2e6..b6971e1c 100644 --- a/src/mining/mod.rs +++ b/src/mining/mod.rs @@ -54,58 +54,87 @@ where { let num_stages = self.stages.len(); + let mut previous_stage = None; + for (stage_index, stage) in self.stages.iter_mut().enumerate() { let stage_started = Instant::now(); - let stage_id = stage.id().0; - - let success = async { - info!("RUNNING"); - - let input = StageInput { - restarted: false, - first_started_at: (stage_started, Some(last_block)), - previous_stage: None, - stage_progress: Some(last_block), - }; + let stage_id = stage.id(); + let input = StageInput { + restarted: false, + first_started_at: (stage_started, Some(last_block)), + previous_stage, + stage_progress: Some(last_block), + }; + let exec_output = async { + info!( + "RUNNING from {}", + input + .stage_progress + .map(|s| s.to_string()) + .unwrap_or_else(|| "genesis".to_string()) + ); let output = stage.execute(tx, input).await; - let new_block = last_block.0 + 1; - - let success = matches!( - &output, - Ok(ExecOutput::Progress { stage_progress, .. }) if stage_progress.0 == new_block - ); - - if success { - info!("DONE in {}", format_duration(stage_started.elapsed(), true)); - } else { - warn!( - "Creating a block proposal for height {} failed: {}", - new_block, - if let Err(e) = output { - format!("{:?}", e) + // Nothing here, pass along. + match &output { + Ok(ExecOutput::Progress { + done, + stage_progress, + .. + }) => { + if *done { + info!( + "DONE @ {} in {}", + stage_progress, + format_duration(Instant::now() - stage_started, true) + ); } else { - "".to_string() - }, - ); + warn!( + "Stage not done, with no reason @ {} in {}, exit...", + stage_progress, + format_duration(Instant::now() - stage_started, true) + ); + } + } + Ok(ExecOutput::Unwind { unwind_to }) => { + warn!("Stage trigger unwind to {}, exit...", unwind_to); + } + Err(err) => { + warn!("mining err: {:?}, exit...", err); + } } - success + output } .instrument(span!( Level::INFO, "", - " {}/{} Mining {} ", + " {}/{} {} ", stage_index + 1, num_stages, - stage_id, + AsRef::::as_ref(&stage_id) )) .await; - if !success { - break; - } + // Check how stage run went. + let done_progress = match exec_output { + Ok(ExecOutput::Progress { + stage_progress, + done, + .. + }) => { + // Stage is "done", that is cannot make any more progress at this time. + if !done { + break; + } + stage_progress + } + _ => { + break; + } + }; + previous_stage = Some((stage_id, done_progress)) } } } diff --git a/src/mining/proposal.rs b/src/mining/proposal.rs index 193505de..92bf8dd9 100644 --- a/src/mining/proposal.rs +++ b/src/mining/proposal.rs @@ -67,9 +67,13 @@ pub fn create_block_header( parent_header: &BlockHeader, config: Arc>, ) -> anyhow::Result { - let timestamp = now(); + let mut timestamp = now(); if timestamp <= parent_header.timestamp { - bail!("Current system time is earlier than existing block timestamp."); + // TODO add forceTime option + // Sanity check the timestamp correctness, recap the timestamp + // to parent+1 if the mutation is allowed. + // bail!("Current system time is earlier than existing block timestamp."); + timestamp = parent_header.timestamp + 1; } let parent_gas_limit = parent_header.gas_limit; @@ -82,13 +86,13 @@ pub fn create_block_header( beneficiary: config.get_ether_base(), difficulty: U256::ZERO, extra_data: config.get_extra_data(), - timestamp: timestamp, + timestamp, ommers_hash: H256::zero(), state_root: H256::zero(), transactions_root: H256::zero(), receipts_root: H256::zero(), logs_bloom: ethereum_types::Bloom::zero(), - gas_limit: gas_limit, + gas_limit, gas_used: 0, mix_hash: H256::zero(), nonce: ethereum_types::H64::zero(), diff --git a/src/models/chainspec.rs b/src/models/chainspec.rs index f737d548..4e47be4c 100644 --- a/src/models/chainspec.rs +++ b/src/models/chainspec.rs @@ -620,6 +620,8 @@ pub enum Precompile { pub struct P2PParams { #[serde(default, skip_serializing_if = "Vec::is_empty")] pub bootnodes: Vec, + #[serde(default, skip_serializing_if = "Vec::is_empty")] + pub static_peers: Vec, #[serde( default, skip_serializing_if = "Option::is_none", @@ -715,6 +717,7 @@ mod tests { "enode://343149e4feefa15d882d9fe4ac7d88f885bd05ebb735e547f12e12080a9fa07c8014ca6fd7f373123488102fe5e34111f8509cf0b7de3f5b44339c9f25e87cb8@52.3.158.184:30303", "enode://b6b28890b006743680c52e64e0d16db57f28124885595fa03a562be1d2bf0f3a1da297d56b13da25fb992888fd556d4c1a27b1f39d531bde7de1921c90061cc6@159.89.28.211:30303", ].into_iter().map(ToString::to_string).collect(), + static_peers: vec![], dns: Some("all.rinkeby.ethdisco.net".to_owned()), } }, diff --git a/src/p2p/node/builder.rs b/src/p2p/node/builder.rs index 1aef1309..880996cc 100644 --- a/src/p2p/node/builder.rs +++ b/src/p2p/node/builder.rs @@ -9,6 +9,7 @@ use parking_lot::{Mutex, RwLock}; use std::sync::Arc; use tokio::sync::{watch, Notify}; use tonic::transport::Channel; +use tracing::info; #[derive(Debug)] pub struct NodeBuilder { @@ -30,17 +31,20 @@ impl NodeBuilder { } pub fn add_sentry(mut self, endpoint: impl Into) -> Self { - self.sentries.push(Sentry::new( - Channel::builder(endpoint.into()).connect_lazy(), - )); + let uri = endpoint.into(); + info!("add_sentry endpoint {:?}", uri); + self.sentries + .push(Sentry::new(Channel::builder(uri).connect_lazy())); self } - pub fn set_chain_head(mut self, height: BlockNumber, hash: H256, td: U256) -> Self { + pub fn set_chain_head(mut self, height: BlockNumber, hash: H256, parent_hash: H256, td: U256) -> Self { let status = Status { height, hash, total_difficulty: H256::from(td.to_be_bytes()), + parent_hash, + td }; self.status = Some(status); self diff --git a/src/p2p/node/node.rs b/src/p2p/node/node.rs index 9bd83a3d..3050adc7 100644 --- a/src/p2p/node/node.rs +++ b/src/p2p/node/node.rs @@ -109,6 +109,14 @@ impl Node { } } Message::BlockHeaders(ref headers) => { + if headers.headers.len() > 0 { + info!( + "recv a BlockHeaders in handler {:?}:{:?}, len {}", + headers.headers[0].number, + headers.headers[0].hash(), + headers.headers.len() + ); + } if let Some(max_header) = headers.headers.iter().max_by_key(|header| header.number) { @@ -140,6 +148,12 @@ impl Node { } } Message::NewBlock(inner) => { + info!( + "recv a new block in handler {:?}:{:?}, from {}", + inner.block.header.number, + inner.block.header.hash(), + inner.block.header.beneficiary + ); let hash = inner.block.header.hash(); let number = inner.block.header.number; @@ -296,6 +310,7 @@ impl Node { height, hash, total_difficulty, + .. } = *self.status.read(); let config = &self.config; let status_data = grpc_sentry::StatusData { diff --git a/src/p2p/types/status.rs b/src/p2p/types/status.rs index b61615a7..80aa37f2 100644 --- a/src/p2p/types/status.rs +++ b/src/p2p/types/status.rs @@ -4,15 +4,19 @@ use crate::models::*; pub struct Status { pub height: BlockNumber, pub hash: H256, + pub parent_hash: H256, pub total_difficulty: H256, + pub td: U256, } impl Status { - pub fn new(height: BlockNumber, hash: H256, td: U256) -> Self { + pub fn new(height: BlockNumber, hash: H256, parent_hash: H256, td: U256) -> Self { Self { height, hash, + parent_hash, total_difficulty: H256::from(td.to_be_bytes()), + td } } } @@ -26,7 +30,9 @@ impl<'a> From<&'a ChainConfig> for Status { Self { height, hash, + parent_hash: EMPTY_HASH, total_difficulty, + td: config.chain_spec.genesis.seal.difficulty(), } } } diff --git a/src/sentry/mod.rs b/src/sentry/mod.rs index 8aa5397b..ce00dfbc 100644 --- a/src/sentry/mod.rs +++ b/src/sentry/mod.rs @@ -515,11 +515,20 @@ pub async fn run( discovery_tasks.insert("discv4".to_string(), Box::pin(task)); } - if !opts.static_peers.is_empty() { - info!("Enabling static peers: {:?}", opts.static_peers); + let static_peers = if opts.static_peers.is_empty() { + network_params + .static_peers + .iter() + .map(|b| NR(b.parse().unwrap())) + .collect::>() + } else { + opts.static_peers + }; + if !static_peers.is_empty() { + info!("Enabling static peers: {:?}", static_peers); let task = StaticNodes::new( - opts.static_peers + static_peers .iter() .map(|&NR(NodeRecord { addr, id })| (addr, id)) .collect::>(), diff --git a/src/stagedsync/mod.rs b/src/stagedsync/mod.rs index a37da84d..8bed6b0d 100644 --- a/src/stagedsync/mod.rs +++ b/src/stagedsync/mod.rs @@ -71,7 +71,6 @@ where post_cycle_callback: Option BoxFuture<'static, ()> + Send + Sync + 'static>>, staged_mining: Option>, - pub is_mining: bool, } impl<'db, E> Default for StagedSync<'db, E> @@ -101,7 +100,6 @@ where delay_after_sync: None, post_cycle_callback: None, staged_mining: None, - is_mining: false, } } @@ -183,15 +181,9 @@ where let mut bad_block = None; let mut unwind_to = self.start_with_unwind; 'run_loop: loop { - info!( - "stages len {}, is_mining {}", - self.stages.len(), - self.is_mining - ); self.current_stage_sender.send(None).unwrap(); let mut tx = db.begin_mutable()?; - info!("stages len {}, begin tx", self.stages.len()); // Start with unwinding if it's been requested. if let Some(to) = unwind_to.take() { @@ -259,9 +251,7 @@ where } bad_block = None; - if !self.is_mining { - tx.commit()?; - } + tx.commit()?; } else { // Now that we're done with unwind, let's roll. @@ -432,9 +422,7 @@ where { // Commit and restart transaction. debug!("Commit requested"); - if !self.is_mining { - tx.commit()?; - } + tx.commit()?; debug!("Commit complete"); tx = db.begin_mutable()?; } @@ -552,9 +540,7 @@ where } } } - if !self.is_mining { - tx.commit()?; - } + tx.commit()?; let last_block = receipts.last().map_or(BlockNumber(0), |r| r.progress); @@ -567,6 +553,12 @@ where .await } + // if enable mining, start it + if let Some(mining) = &mut self.staged_mining { + let mut tx = db.begin_mutable()?; + mining.run(&mut tx, last_block).await; + } + if let Some(minimum_progress) = minimum_progress { if let Some(max_block) = self.max_block { if minimum_progress >= max_block { @@ -581,6 +573,11 @@ where } } } + + /// enable mining after sync done + pub fn enable_mining(&mut self, staged_mining: StagedMining<'db, E>) { + self.staged_mining = Some(staged_mining); + } } pub fn format_duration(dur: Duration, subsec_millis: bool) -> String { diff --git a/src/stages/headers.rs b/src/stages/headers.rs index 9e450bdf..a0d51900 100644 --- a/src/stages/headers.rs +++ b/src/stages/headers.rs @@ -155,12 +155,13 @@ where let current_chain_tip = loop { let _ = chain_tip.changed().await; let (n, _) = *chain_tip.borrow(); + info!("try get Chain tip={}", n); if n > prev_progress { break n; } }; - debug!("Chain tip={}", current_chain_tip); + info!("Chain tip={}", current_chain_tip); let (mut target_block, mut reached_tip) = Self::forward_set_target_block( prev_progress, @@ -205,6 +206,10 @@ where { // Check that downloaded headers attach to present chain if let Some((_, first_downloaded)) = downloaded.first() { + info!( + "prev_progress_hash {}:{:?}, first_downloaded: {:?}", + prev_progress, prev_progress_hash, first_downloaded + ); if let Some((_, last_buffered)) = headers.last() { if last_buffered.hash() != first_downloaded.parent_hash { // Does not attach to buffered chain, just pop last header and download again @@ -555,6 +560,13 @@ impl HeaderDownload { if let Some(first) = headers.first() { if prev_progress_header.hash() != first.1.parent_hash { + info!( + "Difficulty graph first, prev {}:{:?}, first: {}:{:?}, will unwind", + prev_progress_header.number, + prev_progress_header.hash(), + first.1.number, + first.1.parent_hash + ); return Ok(None); } } @@ -652,9 +664,10 @@ impl HeaderDownload { txn: &'tx mut MdbxTransaction<'_, RW, E>, height: BlockNumber, ) -> anyhow::Result<()> { + let block = txn.get(tables::Header, height)?.unwrap(); let hash = txn.get(tables::CanonicalHeader, height)?.unwrap(); let td = txn.get(tables::HeadersTotalDifficulty, height)?.unwrap(); - let status = Status::new(height, hash, td); + let status = Status::new(height, hash, block.parent_hash, td); self.node.update_chain_head(Some(status)).await; Ok(()) } diff --git a/src/stages/interhashes.rs b/src/stages/interhashes.rs index 2e043fed..d6524f41 100644 --- a/src/stages/interhashes.rs +++ b/src/stages/interhashes.rs @@ -1,16 +1,19 @@ use crate::{ accessors, - consensus::DuoError, + consensus::{DuoError, ValidationError}, kv::mdbx::*, models::*, stagedsync::stage::{ExecOutput, Stage, StageError, StageInput, UnwindInput, UnwindOutput}, - stages::stage_util::should_do_clean_promotion, + stages::{stage_util::should_do_clean_promotion, MiningBlock}, trie::*, StageId, }; use anyhow::format_err; use async_trait::async_trait; -use std::{cmp, sync::Arc}; +use std::{ + cmp, + sync::{Arc, Mutex}, +}; use tempfile::TempDir; use tracing::*; @@ -21,13 +24,19 @@ pub const INTERMEDIATE_HASHES: StageId = StageId("IntermediateHashes"); pub struct Interhashes { temp_dir: Arc, clean_promotion_threshold: u64, + mining_block: Option>>, } impl Interhashes { - pub fn new(temp_dir: Arc, clean_promotion_threshold: Option) -> Self { + pub fn new( + temp_dir: Arc, + clean_promotion_threshold: Option, + mining_block: Option>>, + ) -> Self { Self { temp_dir, clean_promotion_threshold: clean_promotion_threshold.unwrap_or(1_000_000_000_000), + mining_block, } } } @@ -57,10 +66,6 @@ where let past_progress = input.stage_progress.unwrap_or(genesis); if max_block > past_progress { - let block_state_root = accessors::chain::header::read(tx, max_block)? - .ok_or_else(|| format_err!("No header for block {}", max_block))? - .state_root; - let trie_root = if should_do_clean_promotion( tx, genesis, @@ -69,15 +74,10 @@ where self.clean_promotion_threshold, )? { debug!("Regenerating intermediate hashes"); - regenerate_intermediate_hashes(tx, self.temp_dir.as_ref(), Some(block_state_root)) + regenerate_intermediate_hashes(tx, self.temp_dir.as_ref(), None) } else { debug!("Incrementing intermediate hashes"); - increment_intermediate_hashes( - tx, - self.temp_dir.as_ref(), - past_progress, - Some(block_state_root), - ) + increment_intermediate_hashes(tx, self.temp_dir.as_ref(), past_progress, None) } .map_err(|e| match e { DuoError::Validation(error) => StageError::Validation { @@ -89,7 +89,24 @@ where } })?; - info!("Block #{} state root OK: {:?}", max_block, trie_root) + info!("Block #{} state root OK: {:?}", max_block, trie_root); + + if let Some(mining_block) = self.mining_block.as_ref() { + mining_block.lock().unwrap().header.state_root = trie_root; + } else { + // check header root + let block_state_root = accessors::chain::header::read(tx, max_block)? + .ok_or_else(|| format_err!("No header for block {}", max_block))? + .state_root; + + if block_state_root != trie_root { + return Err(DuoError::Validation(ValidationError::WrongStateRoot { + expected: block_state_root, + got: trie_root, + }) + .into()); + } + } }; Ok(ExecOutput::Progress { diff --git a/src/stages/mining_create_block.rs b/src/stages/mining_create_block.rs index 6e1f54d9..19941573 100644 --- a/src/stages/mining_create_block.rs +++ b/src/stages/mining_create_block.rs @@ -8,6 +8,7 @@ use crate::{ state::*, }, models::*, + p2p::node::Node, stagedsync::stage::*, state::Buffer, StageId, @@ -33,7 +34,7 @@ pub const CREATE_BLOCK: StageId = StageId("CreateBlock"); // to override the extra-data in to prevent no-fork attacks. pub const DAOFORKEXTRARANG: i32 = 10; -#[derive(Debug)] +#[derive(Debug, Default)] pub struct MiningBlock { pub header: BlockHeader, pub ommers: ArrayVec, @@ -56,6 +57,7 @@ pub struct CreateBlock { pub mining_block: Arc>, pub mining_config: Arc>, pub chain_spec: ChainSpec, + pub node: Arc, } #[async_trait] @@ -75,7 +77,7 @@ where where 'db: 'tx, { - let parent_number = input.stage_progress.unwrap_or(BlockNumber(0)); + let parent_number = input.stage_progress.unwrap(); let parent_header = get_header(tx, parent_number)?; // TODO, complete the remaining tx related part after txpool ready. diff --git a/src/stages/mining_exec_block.rs b/src/stages/mining_exec_block.rs index 9d6d976b..77011e05 100644 --- a/src/stages/mining_exec_block.rs +++ b/src/stages/mining_exec_block.rs @@ -13,12 +13,13 @@ use crate::{ state::*, }, models::{ - BlockBodyWithSenders, BlockHeader, BlockNumber, ChainSpec, MessageWithSender, + BlockBodyWithSenders, BlockHeader, BlockNumber, Bloom, ChainSpec, MessageWithSender, MessageWithSignature, }, res::chainspec, stagedsync::stage::*, state::IntraBlockState, + trie::root_hash, Buffer, StageId, }; use anyhow::{bail, format_err}; @@ -28,13 +29,14 @@ use hex::FromHex; use mdbx::{EnvironmentKind, RW}; use num_bigint::{BigInt, Sign}; use num_traits::ToPrimitive; +use primitive_types::H256; use std::{ cmp::Ordering, sync::{Arc, Mutex}, time::{Duration, Instant}, }; use tokio::io::copy; -use tracing::{debug, info}; +use tracing::{debug, info, warn}; pub const STAGE_EXEC_BLOCK: StageId = StageId("StageExecBlock"); // DAOForkExtraRange is the number of consecutive blocks from the DAO fork point @@ -191,22 +193,24 @@ fn execute_mining_blocks( more_txs ); - buffer.insert_receipts(block_number, receipts); + // save header + let mut cursor_header = tx.cursor(tables::Header).unwrap(); + cursor_header.put(header.number, header.clone()).unwrap(); - // TODO MDBX_EKEYMISMATCH: The given key value is mismatched to the current cursor position - // { - // let mut c = tx.cursor(tables::CallTraceSet).unwrap(); - // for (address, CallTracerFlags { from, to }) in call_tracer.into_sorted_iter() { - // c.append_dup(header.number, CallTraceSetEntry { address, from, to }).unwrap(); - // } - // } - // buffer.write_to_db().unwrap(); + // calculate receipts root, bloom and gasUsed + current.header.gas_used = receipts.last().map(|r| r.cumulative_gas_used).unwrap_or(0); + current.header.logs_bloom = receipts + .iter() + .fold(Bloom::zero(), |bloom, r| bloom | r.bloom); + current.header.receipts_root = root_hash(&receipts); + buffer.insert_receipts(block_number, receipts); + buffer.write_to_db().unwrap(); // replace mining block txs - for tx in more_txs.unwrap_or(Vec::new()) { + for t in more_txs.unwrap_or(Vec::new()) { current.transactions.push(MessageWithSignature { - message: tx.message, - signature: tx.signature, + message: t.message, + signature: t.signature, }); } diff --git a/src/stages/mining_finish_block.rs b/src/stages/mining_finish_block.rs index 004047c7..c3c70a20 100644 --- a/src/stages/mining_finish_block.rs +++ b/src/stages/mining_finish_block.rs @@ -113,28 +113,13 @@ where warn!("mining finish send pending_result_ch err: {:?}", err); } - let success = self + let _success = self .mining_config .lock() .unwrap() .consensus - .seal(tx, &mut block)?; + .seal(self.node.clone(), tx, block)?; - if success { - // TODO set correct TD - let td = block.header.difficulty; - // Broadcast the mined block to other p2p nodes. - let sent_request_id = rand::thread_rng().gen(); - // TODO add mined block into stageSync - info!("send_new_mining_block to others, block: {:?}", block); - self.node - .send_new_mining_block( - sent_request_id, - block, - td, - ) - .await; - } Ok(ExecOutput::Progress { stage_progress: prev_stage, done: true,