Skip to content

Commit

Permalink
reconstruct and fmt the sync codes (#3969)
Browse files Browse the repository at this point in the history
* debug sync successfully

* dag sync in a synchronized way

* fix sync bugs

* fix bug: update the dag accumulator at necessary point

* reconstruct and fmt the sync
  • Loading branch information
jackzhhuang authored Sep 20, 2023
1 parent e361e0c commit fe9ec56
Show file tree
Hide file tree
Showing 15 changed files with 499 additions and 388 deletions.
29 changes: 16 additions & 13 deletions chain/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ use starcoin_open_block::OpenedBlock;
use starcoin_state_api::{AccountStateReader, ChainStateReader, ChainStateWriter};
use starcoin_statedb::ChainStateDB;
use starcoin_storage::flexi_dag::SyncFlexiDagSnapshot;
use starcoin_storage::Store;
use starcoin_storage::storage::CodecKVStore;
use starcoin_storage::Store;
use starcoin_time_service::TimeService;
use starcoin_types::block::BlockIdAndNumber;
use starcoin_types::contract_event::ContractEventInfo;
Expand Down Expand Up @@ -108,7 +108,10 @@ impl BlockChain {
)),
None => None,
};
let dag_snapshot_tips = storage.get_accumulator_snapshot_storage().get(head_id)?.map(|snapshot| snapshot.child_hashes);
let dag_snapshot_tips = storage
.get_accumulator_snapshot_storage()
.get(head_id)?
.map(|snapshot| snapshot.child_hashes);
let mut chain = Self {
genesis_hash: genesis,
time_service,
Expand All @@ -123,11 +126,7 @@ impl BlockChain {
storage.as_ref(),
),
status: ChainStatusWithBlock {
status: ChainStatus::new(
head_block.header.clone(),
block_info,
dag_snapshot_tips,
),
status: ChainStatus::new(head_block.header.clone(), block_info, dag_snapshot_tips),
head: head_block,
},
statedb: chain_state,
Expand Down Expand Up @@ -638,21 +637,25 @@ impl BlockChain {
);
Ok(())
}

pub fn dag_parents_in_tips(&self, dag_parents: Vec<HashValue>) -> Result<bool> {
Ok(dag_parents.into_iter().all(|parent| {
match &self.status.status.tips_hash {
Ok(dag_parents
.into_iter()
.all(|parent| match &self.status.status.tips_hash {
Some(tips) => tips.contains(&parent),
None => false,
}
}))
}))
}

pub fn is_head_of_dag_accumulator(&self, next_tips: Vec<HashValue>) -> Result<bool> {
let key = Self::calculate_dag_accumulator_key(next_tips)?;
let next_tips_info = self.storage.get_dag_accumulator_info(key)?;

return Ok(next_tips_info == self.dag_accumulator.as_ref().map(|accumulator| accumulator.get_info()));
return Ok(next_tips_info
== self
.dag_accumulator
.as_ref()
.map(|accumulator| accumulator.get_info()));
}
}

Expand Down
2 changes: 1 addition & 1 deletion commons/stream-task/src/collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use thiserror::Error;

#[derive(Clone, Copy, Debug)]
#[derive(Clone, Copy, Debug, PartialEq)]
pub enum CollectorState {
/// Collector is enough, do not feed more item, finish task.
Enough,
Expand Down
3 changes: 1 addition & 2 deletions node/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ use futures::executor::block_on;
use futures_timer::Delay;
use network_api::{PeerProvider, PeerSelector, PeerStrategy};
use starcoin_account_service::{AccountEventService, AccountService, AccountStorage};
use starcoin_accumulator::node::AccumulatorStoreType;
use starcoin_block_relayer::BlockRelayer;
use starcoin_chain_notify::ChainNotifyHandlerService;
use starcoin_chain_service::ChainReaderService;
Expand Down Expand Up @@ -46,7 +45,7 @@ use starcoin_storage::db_storage::DBStorage;
use starcoin_storage::errors::StorageInitError;
use starcoin_storage::metrics::StorageMetrics;
use starcoin_storage::storage::StorageInstance;
use starcoin_storage::{BlockStore, Storage, Store};
use starcoin_storage::{BlockStore, Storage};
use starcoin_stratum::service::{StratumService, StratumServiceFactory};
use starcoin_stratum::stratum::{Stratum, StratumFactory};
use starcoin_sync::announcement::AnnouncementService;
Expand Down
5 changes: 4 additions & 1 deletion storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -740,7 +740,10 @@ impl SyncFlexiDagStore for Storage {

// for block chain
new_tips.iter().try_fold((), |_, block_id| {
if let Some(t) = self.flexi_dag_storage.get_hashes_by_hash(block_id.clone())? {
if let Some(t) = self
.flexi_dag_storage
.get_hashes_by_hash(block_id.clone())?
{
if t != snapshot {
bail!("the key {} should not exists", block_id);
}
Expand Down
9 changes: 0 additions & 9 deletions sync/api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,15 +66,6 @@ pub struct SyncTarget {
pub peers: Vec<PeerId>,
}

#[derive(Debug, Clone)]
pub struct NewBlockChainRequest {
pub new_head_block: HashValue,
}

impl ServiceRequest for NewBlockChainRequest {
type Response = anyhow::Result<()>;
}

#[derive(Debug, Clone)]
pub struct SyncStatusRequest;

Expand Down
62 changes: 24 additions & 38 deletions sync/src/block_connector/block_connector_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@

use crate::block_connector::{ExecuteRequest, ResetRequest, WriteBlockChainService};
use crate::sync::{CheckSyncEvent, SyncService};
use crate::tasks::{BlockConnectedEvent, BlockDiskCheckEvent};
use anyhow::{format_err, Result, Ok};
use crate::tasks::{BlockConnectedEvent, BlockDiskCheckEvent, BlockConnectedFinishEvent};
use anyhow::{format_err, Ok, Result};
use network_api::PeerProvider;
use starcoin_chain_api::{ConnectBlockError, WriteableChainService, ChainReader};
use starcoin_chain_api::{ChainReader, ConnectBlockError, WriteableChainService};
use starcoin_config::{NodeConfig, G_CRATE_VERSION};
use starcoin_consensus::BlockDAG;
use starcoin_executor::VMMetrics;
Expand All @@ -16,16 +16,14 @@ use starcoin_service_registry::{
ActorService, EventHandler, ServiceContext, ServiceFactory, ServiceHandler,
};
use starcoin_storage::{BlockStore, Storage};
use starcoin_sync_api::{PeerNewBlock, NewBlockChainRequest};
use starcoin_sync_api::PeerNewBlock;
use starcoin_txpool::TxPoolService;
use starcoin_types::block::ExecutedBlock;
use starcoin_types::sync_status::SyncStatus;
use starcoin_types::system_events::{MinedBlock, SyncStatusChangeEvent, SystemShutdown, NewHeadBlock};
use starcoin_types::system_events::{MinedBlock, SyncStatusChangeEvent, SystemShutdown};
use std::sync::{Arc, Mutex};
use sysinfo::{DiskExt, System, SystemExt};

use super::BlockConnectedRequest;

const DISK_CHECKPOINT_FOR_PANIC: u64 = 1024 * 1024 * 1024 * 3;
const DISK_CHECKPOINT_FOR_WARN: u64 = 1024 * 1024 * 1024 * 5;

Expand Down Expand Up @@ -170,15 +168,28 @@ impl EventHandler<Self, BlockConnectedEvent> for BlockConnectorService {
fn handle_event(
&mut self,
msg: BlockConnectedEvent,
_ctx: &mut ServiceContext<BlockConnectorService>,
ctx: &mut ServiceContext<BlockConnectorService>,
) {
//because this block has execute at sync task, so just try connect to select head chain.
//TODO refactor connect and execute

let block = msg.block;
if let Err(e) = self.chain_service.try_connect(block, msg.dag_parents) {
error!("Process connected block error: {:?}", e);
let feedback = msg.feedback;

match msg.action {
crate::tasks::BlockConnectAction::ConnectNewBlock => {
if let Err(e) = self.chain_service.try_connect(block, msg.dag_parents) {
error!("Process connected new block from sync error: {:?}", e);
}
}
crate::tasks::BlockConnectAction::ConnectExecutedBlock => {
if let Err(e) = self.chain_service.switch_new_main(block.header().id(), ctx) {
error!("Process connected executed block from sync error: {:?}", e);
}
}
}

feedback.map(|f| f.unbounded_send(BlockConnectedFinishEvent));
}
}

Expand Down Expand Up @@ -222,7 +233,9 @@ impl EventHandler<Self, PeerNewBlock> for BlockConnectorService {
match connect_error {
ConnectBlockError::FutureBlock(block) => {
//TODO cache future block
if let std::result::Result::Ok(sync_service) = ctx.service_ref::<SyncService>() {
if let std::result::Result::Ok(sync_service) =
ctx.service_ref::<SyncService>()
{
info!(
"BlockConnector try connect future block ({:?},{}), peer_id:{:?}, notify Sync service check sync.",
block.id(),
Expand Down Expand Up @@ -279,18 +292,6 @@ impl ServiceHandler<Self, ResetRequest> for BlockConnectorService {
}
}

impl ServiceHandler<Self, NewBlockChainRequest> for BlockConnectorService {
fn handle(
&mut self,
msg: NewBlockChainRequest,
ctx: &mut ServiceContext<BlockConnectorService>,
) -> Result<()> {
let (new_branch, dag_parents, next_tips) = self.chain_service.switch_new_main(msg.new_head_block)?;
ctx.broadcast(NewHeadBlock(Arc::new(new_branch.head_block()), Some(dag_parents), Some(next_tips)));
Ok(())
}
}

impl ServiceHandler<Self, ExecuteRequest> for BlockConnectorService {
fn handle(
&mut self,
Expand All @@ -301,18 +302,3 @@ impl ServiceHandler<Self, ExecuteRequest> for BlockConnectorService {
.execute(msg.block, msg.dag_transaction_parent)
}
}

impl ServiceHandler<Self, BlockConnectedRequest> for BlockConnectorService {
fn handle(
&mut self,
msg: BlockConnectedRequest,
_ctx: &mut ServiceContext<BlockConnectorService>,
) -> Result<()> {
//because this block has execute at sync task, so just try connect to select head chain.
//TODO refactor connect and execute

let block = msg.block;
let result = self.chain_service.try_connect(block, msg.dag_parents);
result
}
}
10 changes: 0 additions & 10 deletions sync/src/block_connector/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,3 @@ pub struct ExecuteRequest {
impl ServiceRequest for ExecuteRequest {
type Response = anyhow::Result<ExecutedBlock>;
}

#[derive(Debug, Clone)]
pub struct BlockConnectedRequest {
pub block: Block,
pub dag_parents: Option<Vec<HashValue>>,
}

impl ServiceRequest for BlockConnectedRequest {
type Response = anyhow::Result<()>;
}
5 changes: 2 additions & 3 deletions sync/src/block_connector/test_write_dag_block_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,7 @@ pub fn gen_dag_blocks(
}
}

let result = writeable_block_chain_service
.execute_dag_block_pool();
let result = writeable_block_chain_service.execute_dag_block_pool();
let result = result.unwrap();
match result {
super::write_block_chain::ConnectOk::Duplicate(block)
Expand Down Expand Up @@ -159,7 +158,7 @@ async fn test_block_chain_switch_main() {
.get_main()
.current_header()
.id(),
last_block.unwrap()
last_block.unwrap()
);

last_block = gen_fork_dag_block_chain(
Expand Down
Loading

0 comments on commit fe9ec56

Please sign in to comment.