Skip to content

Commit

Permalink
added trait UnitFinalizationHandler for "low-level" finalization hand…
Browse files Browse the repository at this point in the history
…ling of units
  • Loading branch information
fixxxedpoint committed May 1, 2024
1 parent 19a0d03 commit 12213ae
Show file tree
Hide file tree
Showing 7 changed files with 83 additions and 51 deletions.
10 changes: 9 additions & 1 deletion consensus/src/dag/reconstruction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,11 +77,19 @@ impl<U: Unit> UnitWithParents for ReconstructedUnit<U> {
}
}

impl<D: Data, H: Hasher, K: MultiKeychain> From<ReconstructedUnit<Signed<FullUnit<H, D>, K>>>
for Option<D>
{
fn from(value: ReconstructedUnit<Signed<FullUnit<H, D>, K>>) -> Self {
value.unpack().into_signable().into()
}
}

impl<D: Data, H: Hasher, K: MultiKeychain> From<ReconstructedUnit<Signed<FullUnit<H, D>, K>>>
for OrderedUnit<D, H>
{
fn from(unit: ReconstructedUnit<Signed<FullUnit<H, D>, K>>) -> Self {
let parents = unit.parents().into_values().collect();
let parents = unit.parents().values().cloned().collect();
let unit = unit.unpack();
let creator = unit.creator();
let round = unit.round();
Expand Down
20 changes: 7 additions & 13 deletions consensus/src/extension/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ mod election;
mod extender;
mod units;

use aleph_bft_types::{BatchOfUnits, FinalizationHandler};
use aleph_bft_types::UnitFinalizationHandler;
use extender::Extender;

/// A struct responsible for executing the Consensus protocol on a local copy of the Dag.
Expand All @@ -16,20 +16,15 @@ use extender::Extender;
///
/// We refer to the documentation https://cardinal-cryptography.github.io/AlephBFT/internals.html
/// Section 5.4 for a discussion of this component.
pub struct Ordering<
H: Hasher,
D: Data,
MK: MultiKeychain,
FH: FinalizationHandler<BatchOfUnits<D, H>>,
> {
pub struct Ordering<H: Hasher, D: Data, MK: MultiKeychain, UFH: UnitFinalizationHandler<D, H>> {
extender: Extender<DagUnit<H, D, MK>>,
finalization_handler: FH,
finalization_handler: UFH,
}

impl<H: Hasher, D: Data, MK: MultiKeychain, FH: FinalizationHandler<BatchOfUnits<D, H>>>
Ordering<H, D, MK, FH>
impl<H: Hasher, D: Data, MK: MultiKeychain, UFH: UnitFinalizationHandler<D, H>>
Ordering<H, D, MK, UFH>
{
pub fn new(finalization_handler: FH) -> Self {
pub fn new(finalization_handler: UFH) -> Self {
let extender = Extender::new();
Ordering {
extender,
Expand All @@ -38,8 +33,7 @@ impl<H: Hasher, D: Data, MK: MultiKeychain, FH: FinalizationHandler<BatchOfUnits
}

fn handle_batch(&mut self, batch: Vec<DagUnit<H, D, MK>>) {
let batch = batch.into_iter().map(|unit| unit.into()).collect();
self.finalization_handler.data_finalized(batch);
self.finalization_handler.batch_finalized(batch);
}

pub fn add_unit(&mut self, unit: DagUnit<H, D, MK>) {
Expand Down
33 changes: 25 additions & 8 deletions consensus/src/member.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use crate::{
Config, Data, DataProvider, Hasher, MultiKeychain, Network, NodeIndex, Receiver, Recipient,
Round, Sender, Signature, SpawnHandle, Terminator, UncheckedSigned,
};
use aleph_bft_types::{BatchOfUnits, FinalizationHandler, NodeMap};
use aleph_bft_types::{FinalizationHandler, NodeMap, UnitFinalizationHandler};
use codec::{Decode, Encode};
use futures::{channel::mpsc, pin_mut, AsyncRead, AsyncWrite, FutureExt, StreamExt};
use futures_timer::Delay;
Expand Down Expand Up @@ -122,7 +122,7 @@ impl<DP: DataProvider, FH: FinalizationHandler<DP::Output>, US: AsyncWrite, UL:
unit_saver: US,
unit_loader: UL,
) -> Self {
LocalIO {
Self {
data_provider,
finalization_handler,
unit_saver,
Expand All @@ -132,7 +132,7 @@ impl<DP: DataProvider, FH: FinalizationHandler<DP::Output>, US: AsyncWrite, UL:

pub fn new_with_unit_finalization_handler<
H: Hasher,
UFH: FinalizationHandler<BatchOfUnits<DP::Output, H>>,
UFH: UnitFinalizationHandler<DP::Output, H>,
>(
data_provider: DP,
finalization_handler: UFH,
Expand All @@ -148,6 +148,23 @@ impl<DP: DataProvider, FH: FinalizationHandler<DP::Output>, US: AsyncWrite, UL:
}
}

// pub struct WrappedUnitFinalizationHandler<D, H, UFH> {
// finalization_handler: UFH,
// _phantom: PhantomData<(D, H)>
// }

// impl<D: Data, H: Hasher, UFH: UnitFinalizationHandler<D, H>> UnitFinalizationHandler<D, H> for WrappedUnitFinalizationHandler<D, H, UFH> {
// fn batch_finalized(&mut self, batch: Vec<impl IntoOrderedUnit<D, H>>) {
// self.finalization_handler.batch_finalized(batch)
// }
// }

// impl<H: Hasher, DP: DataProvider, UFH: UnitFinalizationHandler<DP::Output, H>, US: AsyncWrite, UL: AsyncRead>
// LocalIO<DP, WrappedUnitFinalizationHandler<DP::Output, H, UFH>, US, UL>
// {

// }

struct MemberStatus<'a, H: Hasher, D: Data, S: Signature> {
task_queue: &'a TaskQueue<RepeatableTask<H, D, S>>,
not_resolved_parents: &'a HashSet<H::Hash>,
Expand Down Expand Up @@ -582,24 +599,24 @@ where
/// [docs for devs](https://cardinal-cryptography.github.io/AlephBFT/index.html)
/// or the [original paper](https://arxiv.org/abs/1908.05156).
///
/// Please note that in order to fulfill the constraint [`FinalizationHandler<BatchOfUnits<DP::Output, H>>`]
/// Please note that in order to fulfill the constraint [`UnitFinalizationHandler<DP::Output, H>`]
/// it is enough to provide implementation of [`FinalizationHandler<DP::Output>`]. We provide
/// implementation of [`FinalizationHandler<BatchOfUnits<DP::Output, H>>`] for anything that satisfies
/// [`FinalizationHandler<DP::Output>`]. Implementing [`FinalizationHandler<BatchOfUnits<DP::Output, H>>`]
/// implementation of [`UnitFinalizationHandler<DP::Output, H>`] for anything that satisfies
/// [`FinalizationHandler<DP::Output>`]. Implementing [`UnitFinalizationHandler<DP::Output, H>`]
/// directly is considered less stable since it exposes intrisics which might be subject to change.
/// Implement [`FinalizationHandler<DP::Output>`] instead, unless you absolutely know what you are doing.
pub async fn run_session<
H: Hasher,
DP: DataProvider,
FH: FinalizationHandler<BatchOfUnits<DP::Output, H>>,
UFH: UnitFinalizationHandler<DP::Output, H>,
US: AsyncWrite + Send + Sync + 'static,
UL: AsyncRead + Send + Sync + 'static,
N: Network<NetworkData<H, DP::Output, MK::Signature, MK::PartialMultisignature>> + 'static,
SH: SpawnHandle,
MK: MultiKeychain,
>(
config: Config,
local_io: LocalIO<DP, FH, US, UL>,
local_io: LocalIO<DP, UFH, US, UL>,
network: N,
keychain: MK,
spawn_handle: SH,
Expand Down
39 changes: 17 additions & 22 deletions consensus/src/runway/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use crate::{
Config, Data, DataProvider, Hasher, Index, Keychain, MultiKeychain, NodeIndex, Receiver, Round,
Sender, Signature, Signed, SpawnHandle, Terminator, UncheckedSigned,
};
use aleph_bft_types::{BatchOfUnits, FinalizationHandler, Recipient};
use aleph_bft_types::{Recipient, UnitFinalizationHandler};
use futures::{
channel::{mpsc, oneshot},
future::pending,
Expand Down Expand Up @@ -99,19 +99,19 @@ type CollectionResponse<H, D, MK> = UncheckedSigned<
<MK as Keychain>::Signature,
>;

struct Runway<H, D, FH, MK>
struct Runway<H, D, UFH, MK>
where
H: Hasher,
D: Data,
FH: FinalizationHandler<BatchOfUnits<D, H>>,
UFH: UnitFinalizationHandler<D, H>,
MK: MultiKeychain,
{
missing_coords: HashSet<UnitCoord>,
missing_parents: HashSet<H::Hash>,
store: UnitStore<DagUnit<H, D, MK>>,
keychain: MK,
dag: Dag<H, D, MK>,
ordering: Ordering<H, D, MK, FH>,
ordering: Ordering<H, D, MK, UFH>,
alerts_for_alerter: Sender<Alert<H, D, MK::Signature>>,
notifications_from_alerter: Receiver<ForkingNotification<H, D, MK::Signature>>,
unit_messages_from_network: Receiver<RunwayNotificationIn<H, D, MK::Signature>>,
Expand Down Expand Up @@ -205,13 +205,8 @@ impl<'a, H: Hasher> Display for RunwayStatus<'a, H> {
}
}

struct RunwayConfig<
H: Hasher,
D: Data,
FH: FinalizationHandler<BatchOfUnits<D, H>>,
MK: MultiKeychain,
> {
finalization_handler: FH,
struct RunwayConfig<H: Hasher, D: Data, UFH: UnitFinalizationHandler<D, H>, MK: MultiKeychain> {
finalization_handler: UFH,
backup_units_for_saver: Sender<DagUnit<H, D, MK>>,
backup_units_from_saver: Receiver<DagUnit<H, D, MK>>,
alerts_for_alerter: Sender<Alert<H, D, MK::Signature>>,
Expand All @@ -224,14 +219,14 @@ struct RunwayConfig<
new_units_from_creation: Receiver<SignedUnit<H, D, MK>>,
}

impl<H, D, FH, MK> Runway<H, D, FH, MK>
impl<H, D, UFH, MK> Runway<H, D, UFH, MK>
where
H: Hasher,
D: Data,
FH: FinalizationHandler<BatchOfUnits<D, H>>,
UFH: UnitFinalizationHandler<D, H>,
MK: MultiKeychain,
{
fn new(config: RunwayConfig<H, D, FH, MK>, keychain: MK, validator: Validator<MK>) -> Self {
fn new(config: RunwayConfig<H, D, UFH, MK>, keychain: MK, validator: Validator<MK>) -> Self {
let n_members = keychain.node_count();
let RunwayConfig {
finalization_handler,
Expand Down Expand Up @@ -673,10 +668,10 @@ pub struct RunwayIO<
W: AsyncWrite + Send + Sync + 'static,
R: AsyncRead + Send + Sync + 'static,
DP: DataProvider,
FH: FinalizationHandler<BatchOfUnits<DP::Output, H>>,
UFH: UnitFinalizationHandler<DP::Output, H>,
> {
pub data_provider: DP,
pub finalization_handler: FH,
pub finalization_handler: UFH,
pub backup_write: W,
pub backup_read: R,
_phantom: PhantomData<(H, MK::Signature)>,
Expand All @@ -688,12 +683,12 @@ impl<
W: AsyncWrite + Send + Sync + 'static,
R: AsyncRead + Send + Sync + 'static,
DP: DataProvider,
FH: FinalizationHandler<BatchOfUnits<DP::Output, H>>,
> RunwayIO<H, MK, W, R, DP, FH>
UFH: UnitFinalizationHandler<DP::Output, H>,
> RunwayIO<H, MK, W, R, DP, UFH>
{
pub fn new(
data_provider: DP,
finalization_handler: FH,
finalization_handler: UFH,
backup_write: W,
backup_read: R,
) -> Self {
Expand All @@ -707,9 +702,9 @@ impl<
}
}

pub(crate) async fn run<H, US, UL, MK, DP, FH, SH>(
pub(crate) async fn run<H, US, UL, MK, DP, UFH, SH>(
config: Config,
runway_io: RunwayIO<H, MK, US, UL, DP, FH>,
runway_io: RunwayIO<H, MK, US, UL, DP, UFH>,
keychain: MK,
spawn_handle: SH,
network_io: NetworkIO<H, DP::Output, MK>,
Expand All @@ -719,7 +714,7 @@ pub(crate) async fn run<H, US, UL, MK, DP, FH, SH>(
US: AsyncWrite + Send + Sync + 'static,
UL: AsyncRead + Send + Sync + 'static,
DP: DataProvider,
FH: FinalizationHandler<BatchOfUnits<DP::Output, H>>,
UFH: UnitFinalizationHandler<DP::Output, H>,
MK: MultiKeychain,
SH: SpawnHandle,
{
Expand Down
6 changes: 6 additions & 0 deletions consensus/src/units/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,12 @@ pub struct FullUnit<H: Hasher, D: Data> {
hash: RwLock<Option<H::Hash>>,
}

impl<H: Hasher, D: Data> From<FullUnit<H, D>> for Option<D> {
fn from(value: FullUnit<H, D>) -> Self {
value.data
}
}

impl<H: Hasher, D: Data> Clone for FullUnit<H, D> {
fn clone(&self) -> Self {
let hash = self.hash.try_read().and_then(|guard| *guard);
Expand Down
22 changes: 16 additions & 6 deletions types/src/dataio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,24 @@ pub struct OrderedUnit<D: Data, H: Hasher> {
pub round: Round,
}

pub type BatchOfUnits<D, H> = Vec<OrderedUnit<D, H>>;
pub trait IntoOrderedUnit<D: Data, H: Hasher>: Into<OrderedUnit<D, H>> + Into<Option<D>> {}

impl<D: Data, H: Hasher, FH: FinalizationHandler<D>> FinalizationHandler<BatchOfUnits<D, H>>
for FH
{
fn data_finalized(&mut self, batch: BatchOfUnits<D, H>) {
impl<D: Data, H: Hasher, I: Into<OrderedUnit<D, H>> + Into<Option<D>>> IntoOrderedUnit<D, H> for I {}

/// The source of finalization of the units that consensus produces.
///
/// The [`UnitFinalizationHandler::batch_finalized`] method is called whenever a batch of units
/// has been finalized, in order of finalization.
pub trait UnitFinalizationHandler<D: Data, H: Hasher>: Sync + Send + 'static {
/// A batch of units, that contains data provided by [DataProvider::get_data], has been finalized.
/// The calls to this function follow the order of finalization.
fn batch_finalized(&mut self, batch: Vec<impl IntoOrderedUnit<D, H>>);
}

impl<D: Data, H: Hasher, FH: FinalizationHandler<D>> UnitFinalizationHandler<D, H> for FH {
fn batch_finalized(&mut self, batch: Vec<impl IntoOrderedUnit<D, H>>) {
for unit in batch {
if let Some(data) = unit.data {
if let Some(data) = unit.into() {
self.data_finalized(data)
}
}
Expand Down
4 changes: 3 additions & 1 deletion types/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ pub use aleph_bft_crypto::{
NodeIndex, NodeMap, NodeSubset, PartialMultisignature, PartiallyMultisigned, Signable,
Signature, SignatureError, SignatureSet, Signed, UncheckedSigned,
};
pub use dataio::{BatchOfUnits, DataProvider, FinalizationHandler, OrderedUnit};
pub use dataio::{
DataProvider, FinalizationHandler, IntoOrderedUnit, OrderedUnit, UnitFinalizationHandler,
};
pub use network::{Network, Recipient};
pub use tasks::{SpawnHandle, TaskHandle};

Expand Down

0 comments on commit 12213ae

Please sign in to comment.