Skip to content

Commit

Permalink
A0-1822: add substrate chain status notifier (#839)
Browse files Browse the repository at this point in the history
* add substrate chain status notifier
  • Loading branch information
maciejnems authored Jan 2, 2023
1 parent f84e20d commit 38fde8f
Show file tree
Hide file tree
Showing 3 changed files with 94 additions and 12 deletions.
26 changes: 14 additions & 12 deletions finality-aleph/src/sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,23 +59,25 @@ pub trait Finalizer<J: Justification> {
fn finalize(&self, justification: J) -> Result<(), Self::Error>;
}

/// A notification about the chain state changing.
pub enum ChainStateNotification<BI: BlockIdentifier> {
/// A notification about the chain status changing.
pub enum ChainStatusNotification<BI: BlockIdentifier> {
/// A block has been imported.
BlockImported(BI),
/// A block has been finalized.
BlockFinalized(BI),
}

/// A stream of notifications about the chain state in the database changing.
/// A stream of notifications about the chain status in the database changing.
#[async_trait::async_trait]
pub trait ChainStateNotifier<BI: BlockIdentifier> {
/// Returns a chain state notification when it is available.
async fn next(&self) -> ChainStateNotification<BI>;
pub trait ChainStatusNotifier<BI: BlockIdentifier> {
type Error: Display;

/// Returns a chain status notification when it is available.
async fn next(&mut self) -> Result<ChainStatusNotification<BI>, Self::Error>;
}

/// The state of a block in the database.
pub enum BlockState<J: Justification> {
/// The status of a block in the database.
pub enum BlockStatus<J: Justification> {
/// The block is justified and thus finalized.
Justified(J),
/// The block is present, might be finalized if a descendant is justified.
Expand All @@ -84,10 +86,10 @@ pub enum BlockState<J: Justification> {
Unknown,
}

/// The knowledge about the chain state.
pub trait ChainState<J: Justification> {
/// The state of the block.
fn state_of(&self, id: <J::Header as Header>::Identifier) -> BlockState<J>;
/// The knowledge about the chain status.
pub trait ChainStatus<J: Justification> {
/// The status of the block.
fn status_of(&self, id: <J::Header as Header>::Identifier) -> BlockStatus<J>;

/// The header of the best block.
fn best_block(&self) -> J::Header;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ use sp_runtime::traits::{CheckedSub, Header as SubstrateHeader, One};

use crate::sync::{BlockIdentifier, Header};

mod status_notifier;

#[derive(Clone, Debug, PartialEq, Eq)]
pub struct BlockId<H: SubstrateHeader<Number = BlockNumber>> {
hash: H::Hash,
Expand Down
78 changes: 78 additions & 0 deletions finality-aleph/src/sync/substrate/status_notifier.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
use std::fmt::{Display, Error as FmtError, Formatter};

use aleph_primitives::BlockNumber;
use futures::StreamExt;
use sc_client_api::client::{FinalityNotifications, ImportNotifications};
use sp_runtime::traits::{Block as BlockT, Header as SubstrateHeader};
use tokio::select;

use crate::sync::{substrate::BlockId, ChainStatusNotification, ChainStatusNotifier, Header};

/// What can go wrong when waiting for next chain status notification.
#[derive(Debug)]
pub enum Error {
JustificationStreamClosed,
ImportStreamClosed,
}

impl Display for Error {
fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), FmtError> {
use Error::*;
match self {
JustificationStreamClosed => {
write!(f, "finalization notification stream has ended")
}
ImportStreamClosed => {
write!(f, "import notification stream has ended")
}
}
}
}

/// Substrate specific implementation of `ChainStatusNotifier`.
pub struct SubstrateChainStatusNotifier<B>
where
B: BlockT,
{
finality_notifications: FinalityNotifications<B>,
import_notifications: ImportNotifications<B>,
}

impl<B> SubstrateChainStatusNotifier<B>
where
B: BlockT,
{
fn new(
finality_notifications: FinalityNotifications<B>,
import_notifications: ImportNotifications<B>,
) -> Self {
Self {
finality_notifications,
import_notifications,
}
}
}

#[async_trait::async_trait]
impl<B> ChainStatusNotifier<BlockId<B::Header>> for SubstrateChainStatusNotifier<B>
where
B: BlockT,
B::Header: SubstrateHeader<Number = BlockNumber>,
{
type Error = Error;

async fn next(&mut self) -> Result<ChainStatusNotification<BlockId<B::Header>>, Self::Error> {
select! {
maybe_block = self.finality_notifications.next() => {
maybe_block
.map(|block| ChainStatusNotification::BlockFinalized(block.header.id()))
.ok_or(Error::JustificationStreamClosed)
},
maybe_block = self.import_notifications.next() => {
maybe_block
.map(|block| ChainStatusNotification::BlockImported(block.header.id()))
.ok_or(Error::ImportStreamClosed)
}
}
}
}

0 comments on commit 38fde8f

Please sign in to comment.