diff --git a/client/beefy/src/communication/request_response/outgoing_requests_engine.rs b/client/beefy/src/communication/request_response/outgoing_requests_engine.rs index c4d3c926190e6..00ee7610dd4f0 100644 --- a/client/beefy/src/communication/request_response/outgoing_requests_engine.rs +++ b/client/beefy/src/communication/request_response/outgoing_requests_engine.rs @@ -18,21 +18,17 @@ //! Generating request logic for request/response protocol for syncing BEEFY justifications. -use beefy_primitives::{crypto::AuthorityId, BeefyApi, ValidatorSet}; +use beefy_primitives::{crypto::AuthorityId, ValidatorSet}; use codec::Encode; use futures::channel::{oneshot, oneshot::Canceled}; -use log::{debug, error, warn}; +use log::{debug, warn}; use parking_lot::Mutex; use sc_network::{PeerId, ProtocolName}; use sc_network_common::{ request_responses::{IfDisconnected, RequestFailure}, service::NetworkRequest, }; -use sp_api::ProvideRuntimeApi; -use sp_runtime::{ - generic::BlockId, - traits::{Block, NumberFor}, -}; +use sp_runtime::traits::{Block, NumberFor}; use std::{collections::VecDeque, result::Result, sync::Arc}; use crate::{ @@ -46,14 +42,19 @@ type Response = Result, RequestFailure>; /// Used to receive a response from the network. type ResponseReceiver = oneshot::Receiver; +#[derive(Clone, Debug)] +struct RequestInfo { + block: NumberFor, + active_set: ValidatorSet, +} + enum State { Idle, - AwaitingResponse(PeerId, NumberFor, ResponseReceiver), + AwaitingResponse(PeerId, RequestInfo, ResponseReceiver), } -pub struct OnDemandJustificationsEngine { +pub struct OnDemandJustificationsEngine { network: Arc, - runtime: Arc, protocol_name: ProtocolName, live_peers: Arc>>, @@ -62,21 +63,14 @@ pub struct OnDemandJustificationsEngine { state: State, } -impl OnDemandJustificationsEngine -where - B: Block, - R: ProvideRuntimeApi, - R::Api: BeefyApi, -{ +impl OnDemandJustificationsEngine { pub fn new( network: Arc, - runtime: Arc, protocol_name: ProtocolName, live_peers: Arc>>, ) -> Self { Self { network, - runtime, protocol_name, live_peers, peers_cache: VecDeque::new(), @@ -100,10 +94,15 @@ where None } - fn request_from_peer(&mut self, peer: PeerId, block: NumberFor) { - debug!(target: "beefy::sync", "🥩 requesting justif #{:?} from peer {:?}", block, peer); + fn request_from_peer(&mut self, peer: PeerId, req_info: RequestInfo) { + debug!( + target: "beefy::sync", + "🥩 requesting justif #{:?} from peer {:?}", + req_info.block, + peer, + ); - let payload = JustificationRequest:: { begin: block }.encode(); + let payload = JustificationRequest:: { begin: req_info.block }.encode(); let (tx, rx) = oneshot::channel(); @@ -115,11 +114,13 @@ where IfDisconnected::ImmediateError, ); - self.state = State::AwaitingResponse(peer, block, rx); + self.state = State::AwaitingResponse(peer, req_info, rx); } - /// If no other request is in progress, start new justification request for `block`. - pub fn request(&mut self, block: NumberFor) { + /// Start new justification request for `block`, if no other request is in progress. + /// + /// `active_set` will be used to verify validity of potential responses. + pub fn request(&mut self, block: NumberFor, active_set: ValidatorSet) { // ignore new requests while there's already one pending if matches!(self.state, State::AwaitingResponse(_, _, _)) { return @@ -129,7 +130,7 @@ where // Start the requests engine - each unsuccessful received response will automatically // trigger a new request to the next peer in the `peers_cache` until there are none left. if let Some(peer) = self.try_next_peer() { - self.request_from_peer(peer, block); + self.request_from_peer(peer, RequestInfo { block, active_set }); } else { debug!(target: "beefy::sync", "🥩 no good peers to request justif #{:?} from", block); } @@ -138,11 +139,10 @@ where /// Cancel any pending request for block numbers smaller or equal to `block`. pub fn cancel_requests_older_than(&mut self, block: NumberFor) { match &self.state { - State::AwaitingResponse(_, number, _) if *number <= block => { + State::AwaitingResponse(_, req_info, _) if req_info.block <= block => { debug!( - target: "beefy::sync", - "🥩 cancel pending request for justification #{:?}", - number + target: "beefy::sync", "🥩 cancel pending request for justification #{:?}", + req_info.block ); self.state = State::Idle; }, @@ -153,8 +153,7 @@ where fn process_response( &mut self, peer: PeerId, - block: NumberFor, - validator_set: &ValidatorSet, + req_info: &RequestInfo, response: Result, ) -> Result, Error> { response @@ -162,7 +161,7 @@ where debug!( target: "beefy::sync", "🥩 for on demand justification #{:?}, peer {:?} hung up: {:?}", - block, peer, e + req_info.block, peer, e ); Error::InvalidResponse })? @@ -170,60 +169,49 @@ where debug!( target: "beefy::sync", "🥩 for on demand justification #{:?}, peer {:?} error: {:?}", - block, peer, e + req_info.block, peer, e ); Error::InvalidResponse }) .and_then(|encoded| { - decode_and_verify_finality_proof::(&encoded[..], block, &validator_set).map_err( - |e| { - debug!( - target: "beefy::sync", - "🥩 for on demand justification #{:?}, peer {:?} responded with invalid proof: {:?}", - block, peer, e - ); - Error::InvalidResponse - }, + decode_and_verify_finality_proof::( + &encoded[..], + req_info.block, + &req_info.active_set, ) + .map_err(|e| { + debug!( + target: "beefy::sync", + "🥩 for on demand justification #{:?}, peer {:?} responded with invalid proof: {:?}", + req_info.block, peer, e + ); + Error::InvalidResponse + }) }) } pub async fn next(&mut self) -> Option> { - let (peer, block, resp) = match &mut self.state { + let (peer, req_info, resp) = match &mut self.state { State::Idle => { futures::pending!(); // Doesn't happen as 'futures::pending!()' is an 'await' barrier that never passes. return None }, - State::AwaitingResponse(peer, block, receiver) => { + State::AwaitingResponse(peer, req_info, receiver) => { let resp = receiver.await; - (*peer, *block, resp) + (*peer, req_info.clone(), resp) }, }; // We received the awaited response. Our 'receiver' will never generate any other response, // meaning we're done with current state. Move the engine to `State::Idle`. self.state = State::Idle; - let block_id = BlockId::number(block); - let validator_set = self - .runtime - .runtime_api() - .validator_set(&block_id) - .map_err(|e| { - error!(target: "beefy::sync", "🥩 Runtime API error {:?} in on-demand justif engine.", e); - e - }) - .ok()? - .or_else(|| { - error!(target: "beefy::sync", "🥩 BEEFY pallet not available for block {:?}.", block); - None - })?; - - self.process_response(peer, block, &validator_set, resp) + let block = req_info.block; + self.process_response(peer, &req_info, resp) .map_err(|_| { // No valid justification received, try next peer in our set. if let Some(peer) = self.try_next_peer() { - self.request_from_peer(peer, block); + self.request_from_peer(peer, req_info); } else { warn!(target: "beefy::sync", "🥩 ran out of peers to request justif #{:?} from", block); } diff --git a/client/beefy/src/lib.rs b/client/beefy/src/lib.rs index 9dccd4236bef3..a057a9fdc597d 100644 --- a/client/beefy/src/lib.rs +++ b/client/beefy/src/lib.rs @@ -244,7 +244,6 @@ where // The `GossipValidator` adds and removes known peers based on valid votes and network events. let on_demand_justifications = OnDemandJustificationsEngine::new( network.clone(), - runtime.clone(), justifications_protocol_name, known_peers, ); @@ -295,7 +294,7 @@ where persisted_state, }; - let worker = worker::BeefyWorker::<_, _, _, _, _>::new(worker_params); + let worker = worker::BeefyWorker::<_, _, _, _>::new(worker_params); futures::future::join( worker.run(block_import_justif, finality_notifications), @@ -377,17 +376,8 @@ where break state } - // Check if we should move up the chain. - let parent_hash = *header.parent_hash(); - if *header.number() == One::one() || - runtime - .runtime_api() - .validator_set(&BlockId::hash(parent_hash)) - .ok() - .flatten() - .is_none() - { - // We've reached pallet genesis, initialize voter here. + if *header.number() == One::one() { + // We've reached chain genesis, initialize voter here. let genesis_num = *header.number(); let genesis_set = expect_validator_set(runtime, BlockId::hash(header.hash())) .and_then(genesis_set_sanity_check)?; @@ -408,6 +398,19 @@ where sessions.push_front(Rounds::new(*header.number(), active)); } + // Check if state is still available if we move up the chain. + let parent_hash = *header.parent_hash(); + runtime + .runtime_api() + .validator_set(&BlockId::hash(parent_hash)) + .ok() + .flatten() + .ok_or_else(|| { + let msg = format!("{}. Could not initialize BEEFY voter.", parent_hash); + error!(target: "beefy", "🥩 {}", msg); + ClientError::Consensus(sp_consensus::Error::StateUnavailable(msg)) + })?; + // Move up the chain. header = blockchain.expect_header(BlockId::Hash(parent_hash))?; }; diff --git a/client/beefy/src/round.rs b/client/beefy/src/round.rs index 7a8cc4171a155..48d3d087299d0 100644 --- a/client/beefy/src/round.rs +++ b/client/beefy/src/round.rs @@ -89,6 +89,10 @@ where } } + pub(crate) fn validator_set(&self) -> &ValidatorSet { + &self.validator_set + } + pub(crate) fn validator_set_id(&self) -> ValidatorSetId { self.validator_set.id() } diff --git a/client/beefy/src/worker.rs b/client/beefy/src/worker.rs index 6726fa4375387..9669939e594c1 100644 --- a/client/beefy/src/worker.rs +++ b/client/beefy/src/worker.rs @@ -31,8 +31,8 @@ use crate::{ }; use beefy_primitives::{ crypto::{AuthorityId, Signature}, - BeefyApi, Commitment, ConsensusLog, MmrRootHash, Payload, PayloadProvider, SignedCommitment, - ValidatorSet, VersionedFinalityProof, VoteMessage, BEEFY_ENGINE_ID, + Commitment, ConsensusLog, Payload, PayloadProvider, SignedCommitment, ValidatorSet, + VersionedFinalityProof, VoteMessage, BEEFY_ENGINE_ID, }; use codec::{Codec, Decode, Encode}; use futures::{stream::Fuse, FutureExt, StreamExt}; @@ -41,10 +41,9 @@ use sc_client_api::{Backend, FinalityNotification, FinalityNotifications, Header use sc_network_common::service::{NetworkEventStream, NetworkRequest}; use sc_network_gossip::GossipEngine; use sc_utils::notification::NotificationReceiver; -use sp_api::{BlockId, ProvideRuntimeApi}; +use sp_api::BlockId; use sp_arithmetic::traits::{AtLeast32Bit, Saturating}; use sp_consensus::SyncOracle; -use sp_mmr_primitives::MmrApi; use sp_runtime::{ generic::OpaqueDigestItemId, traits::{Block, Header, NumberFor, Zero}, @@ -166,13 +165,13 @@ impl VoterOracle { Ok(()) } - /// Return current pending mandatory block, if any. - pub fn mandatory_pending(&self) -> Option> { + /// Return current pending mandatory block, if any, plus its active validator set. + pub fn mandatory_pending(&self) -> Option<(NumberFor, ValidatorSet)> { self.sessions.front().and_then(|round| { if round.mandatory_done() { None } else { - Some(round.session_start()) + Some((round.session_start(), round.validator_set().clone())) } }) } @@ -239,14 +238,14 @@ impl VoterOracle { } } -pub(crate) struct WorkerParams { +pub(crate) struct WorkerParams { pub backend: Arc, pub payload_provider: P, pub network: N, pub key_store: BeefyKeystore, pub gossip_engine: GossipEngine, pub gossip_validator: Arc>, - pub on_demand_justifications: OnDemandJustificationsEngine, + pub on_demand_justifications: OnDemandJustificationsEngine, pub links: BeefyVoterLinks, pub metrics: Option, pub persisted_state: PersistedState, @@ -287,7 +286,7 @@ impl PersistedState { } /// A BEEFY worker plays the BEEFY protocol -pub(crate) struct BeefyWorker { +pub(crate) struct BeefyWorker { // utilities backend: Arc, payload_provider: P, @@ -297,7 +296,7 @@ pub(crate) struct BeefyWorker { // communication gossip_engine: GossipEngine, gossip_validator: Arc>, - on_demand_justifications: OnDemandJustificationsEngine, + on_demand_justifications: OnDemandJustificationsEngine, // channels /// Links between the block importer, the background voter and the RPC layer. @@ -314,13 +313,11 @@ pub(crate) struct BeefyWorker { persisted_state: PersistedState, } -impl BeefyWorker +impl BeefyWorker where B: Block + Codec, BE: Backend, P: PayloadProvider, - R: ProvideRuntimeApi, - R::Api: BeefyApi + MmrApi>, N: NetworkEventStream + NetworkRequest + SyncOracle + Send + Sync + Clone + 'static, { /// Return a new BEEFY worker instance. @@ -329,7 +326,7 @@ where /// BEEFY pallet has been deployed on-chain. /// /// The BEEFY pallet is needed in order to keep track of the BEEFY authority set. - pub(crate) fn new(worker_params: WorkerParams) -> Self { + pub(crate) fn new(worker_params: WorkerParams) -> Self { let WorkerParams { backend, payload_provider, @@ -551,10 +548,15 @@ where // New state is persisted after finalization. self.finalize(finality_proof)?; } else { - if self_vote || self.voting_oracle().mandatory_pending() == Some(round.1) { - // Persist state after handling self vote to avoid double voting in case - // of voter restarts. - // Also persist state after handling mandatory block vote. + let mandatory_round = self + .voting_oracle() + .mandatory_pending() + .map(|p| p.0 == round.1) + .unwrap_or(false); + // Persist state after handling self vote to avoid double voting in case + // of voter restarts. + // Also persist state after handling mandatory block vote. + if self_vote || mandatory_round { crate::aux_schema::write_voter_state(&*self.backend, &self.persisted_state) .map_err(|e| Error::Backend(e.to_string()))?; } @@ -784,12 +786,10 @@ where } // If the current target is a mandatory block, // make sure there's also an on-demand justification request out for it. - if let Some(block) = self.voting_oracle().mandatory_pending() { + if let Some((block, active)) = self.voting_oracle().mandatory_pending() { // This only starts new request if there isn't already an active one. - self.on_demand_justifications.request(block); + self.on_demand_justifications.request(block, active); } - } else { - debug!(target: "beefy", "🥩 Skipping voting while major syncing."); } } @@ -993,7 +993,6 @@ pub(crate) mod tests { Block, Backend, MmrRootProvider, - TestApi, Arc>, > { let keystore = create_beefy_keystore(*key); @@ -1024,7 +1023,6 @@ pub(crate) mod tests { GossipEngine::new(network.clone(), "/beefy/1", gossip_validator.clone(), None); let on_demand_justifications = OnDemandJustificationsEngine::new( network.clone(), - api.clone(), "/beefy/justifs/1".into(), known_peers, ); @@ -1050,7 +1048,7 @@ pub(crate) mod tests { on_demand_justifications, persisted_state, }; - BeefyWorker::<_, _, _, _, _>::new(worker_params) + BeefyWorker::<_, _, _, _>::new(worker_params) } #[test]