Skip to content

Commit

Permalink
light-client: expose latest_trusted on Supervisor Handle (#394)
Browse files Browse the repository at this point in the history
* Improve semantics of supervisor events

* Implement latest_trusted on Supervisor

* Remove result type alias

* Update changes
  • Loading branch information
xla authored Jun 30, 2020
1 parent 4118ef1 commit ebda75b
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 69 deletions.
6 changes: 6 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
## pending

Light Client:

- Expose latest_trusted from Supervisor Handle ([#394])

## [0.14.1] (2020-06-23)

- Update `prost-amino`/`prost-amino-derive` to v0.6 ([#367])
Expand Down
3 changes: 3 additions & 0 deletions light-client/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ pub enum ErrorKind {
#[error("store error")]
Store,

#[error("no primary")]
NoPrimary,

#[error("no witnesses")]
NoWitnesses,

Expand Down
138 changes: 69 additions & 69 deletions light-client/src/supervisor.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
use contracts::pre;
use crossbeam_channel as channel;

use tendermint::evidence::{ConflictingHeadersEvidence, Evidence};

use crate::{
bail,
callback::Callback,
Expand All @@ -10,32 +15,18 @@ use crate::{
types::{Height, LightBlock, PeerId, Status},
};

use tendermint::evidence::{ConflictingHeadersEvidence, Evidence};

use contracts::pre;
use crossbeam_channel as channel;

/// Type alias for readability
pub type VerificationResult = Result<LightBlock, Error>;

/// Events which are exchanged between the `Supervisor` and its `Handle`s.
/// Input events sent by the [`Handle`]s to the [`Supervisor`]. They carry a [`Callback`] which is
/// used to communicate back the responses of the requests.
#[derive(Debug)]
pub enum Event {
// Inputs
enum HandleInput {
/// Terminate the supervisor process
Terminate(Callback<()>),
/// Verify to the highest height, call the provided callback with result
VerifyToHighest(Callback<VerificationResult>),
VerifyToHighest(Callback<Result<LightBlock, Error>>),
/// Verify to the given height, call the provided callback with result
VerifyToTarget(Height, Callback<VerificationResult>),

// Outputs
/// The supervisor has terminated
Terminated,
/// The verification has succeded
VerificationSuccess(Box<LightBlock>),
/// The verification has failed
VerificationFailure(Error),
VerifyToTarget(Height, Callback<Result<LightBlock, Error>>),
/// Get the latest trusted block.
LatestTrusted(Callback<Result<Option<LightBlock>, Error>>),
}

/// An light client `Instance` packages a `LightClient` together with its `State`.
Expand Down Expand Up @@ -108,9 +99,9 @@ pub struct Supervisor {
/// Reporter of fork evidence
evidence_reporter: Box<dyn EvidenceReporter>,
/// Channel through which to reply to `Handle`s
sender: channel::Sender<Event>,
sender: channel::Sender<HandleInput>,
/// Channel through which to receive events from the `Handle`s
receiver: channel::Receiver<Event>,
receiver: channel::Receiver<HandleInput>,
}

impl std::fmt::Debug for Supervisor {
Expand All @@ -131,7 +122,7 @@ impl Supervisor {
fork_detector: impl ForkDetector + 'static,
evidence_reporter: impl EvidenceReporter + 'static,
) -> Self {
let (sender, receiver) = channel::unbounded::<Event>();
let (sender, receiver) = channel::unbounded::<HandleInput>();

Self {
peers,
Expand All @@ -142,6 +133,17 @@ impl Supervisor {
}
}

/// Create a new handle to this supervisor.
pub fn handle(&mut self) -> Handle {
Handle::new(self.sender.clone())
}

#[pre(self.peers.primary().is_some())]
fn latest_trusted(&self) -> Result<Option<LightBlock>, Error> {
let primary = self.peers.primary().ok_or_else(|| ErrorKind::NoPrimary)?;
Ok(primary.latest_trusted())
}

/// Verify to the highest block.
#[pre(self.peers.primary().is_some())]
pub fn verify_to_highest(&mut self) -> Result<LightBlock, Error> {
Expand Down Expand Up @@ -277,11 +279,6 @@ impl Supervisor {
.detect_forks(light_block, &trusted_state, self.peers.witnesses())
}

/// Create a new handle to this supervisor.
pub fn handle(&mut self) -> Handle {
Handle::new(self.sender.clone())
}

/// Run the supervisor event loop in the same thread.
///
/// This method should typically be called within a new thread with `std::thread::spawn`.
Expand All @@ -290,21 +287,22 @@ impl Supervisor {
let event = self.receiver.recv().unwrap();

match event {
Event::Terminate(callback) => {
HandleInput::LatestTrusted(callback) => {
let outcome = self.latest_trusted();
callback.call(outcome);
}
HandleInput::Terminate(callback) => {
callback.call(());
return;
}
Event::VerifyToTarget(height, callback) => {
HandleInput::VerifyToTarget(height, callback) => {
let outcome = self.verify_to_target(height);
callback.call(outcome);
}
Event::VerifyToHighest(callback) => {
HandleInput::VerifyToHighest(callback) => {
let outcome = self.verify_to_highest();
callback.call(outcome);
}
_ => {
// TODO: Log/record unexpected event
}
}
}
}
Expand All @@ -313,51 +311,58 @@ impl Supervisor {
/// A handle to a `Supervisor` which allows to communicate with
/// the supervisor across thread boundaries via message passing.
pub struct Handle {
sender: channel::Sender<Event>,
sender: channel::Sender<HandleInput>,
}

impl Handle {
/// Crate a new handle that sends events to the supervisor via
/// the given channel. For internal use only.
pub fn new(sender: channel::Sender<Event>) -> Self {
fn new(sender: channel::Sender<HandleInput>) -> Self {
Self { sender }
}

/// Get latest trusted block from the [`Supervisor`].
pub fn latest_trusted(&mut self) -> Result<Option<LightBlock>, Error> {
let (sender, receiver) = channel::bounded::<Result<Option<LightBlock>, Error>>(1);

let callback = Callback::new(move |result| {
sender.send(result).unwrap();
});

// TODO(xla): Transform crossbeam errors into proper domain errors.
self.sender
.send(HandleInput::LatestTrusted(callback))
.unwrap();

// TODO(xla): Transform crossbeam errors into proper domain errors.
receiver.recv().unwrap()
}

/// Verify to the highest block.
pub fn verify_to_highest(&mut self) -> VerificationResult {
self.verify(Event::VerifyToHighest)
pub fn verify_to_highest(&mut self) -> Result<LightBlock, Error> {
self.verify(HandleInput::VerifyToHighest)
}

/// Verify to the block at the given height.
pub fn verify_to_target(&mut self, height: Height) -> VerificationResult {
self.verify(|callback| Event::VerifyToTarget(height, callback))
pub fn verify_to_target(&mut self, height: Height) -> Result<LightBlock, Error> {
self.verify(|callback| HandleInput::VerifyToTarget(height, callback))
}

/// Verify either to the latest block (if `height == None`) or to a given block (if `height == Some(height)`).
fn verify(
&mut self,
make_event: impl FnOnce(Callback<VerificationResult>) -> Event,
) -> VerificationResult {
let (sender, receiver) = channel::bounded::<Event>(1);
make_event: impl FnOnce(Callback<Result<LightBlock, Error>>) -> HandleInput,
) -> Result<LightBlock, Error> {
let (sender, receiver) = channel::bounded::<Result<LightBlock, Error>>(1);

let callback = Callback::new(move |result| {
// We need to create an event here
let event = match result {
Ok(header) => Event::VerificationSuccess(Box::new(header)),
Err(err) => Event::VerificationFailure(err),
};

sender.send(event).unwrap();
sender.send(result).unwrap();
});

let event = make_event(callback);
self.sender.send(event).unwrap();

match receiver.recv().unwrap() {
Event::VerificationSuccess(header) => Ok(*header),
Event::VerificationFailure(err) => Err(err),
_ => todo!(),
}
receiver.recv().unwrap()
}

/// Async version of `verify_to_highest`.
Expand All @@ -366,9 +371,9 @@ impl Handle {
/// verification result.
pub fn verify_to_highest_async(
&mut self,
callback: impl FnOnce(VerificationResult) -> () + Send + 'static,
callback: impl FnOnce(Result<LightBlock, Error>) -> () + Send + 'static,
) {
let event = Event::VerifyToHighest(Callback::new(callback));
let event = HandleInput::VerifyToHighest(Callback::new(callback));
self.sender.send(event).unwrap();
}

Expand All @@ -379,27 +384,22 @@ impl Handle {
pub fn verify_to_target_async(
&mut self,
height: Height,
callback: impl FnOnce(VerificationResult) -> () + Send + 'static,
callback: impl FnOnce(Result<LightBlock, Error>) -> () + Send + 'static,
) {
let event = Event::VerifyToTarget(height, Callback::new(callback));
let event = HandleInput::VerifyToTarget(height, Callback::new(callback));
self.sender.send(event).unwrap();
}

/// Terminate the underlying supervisor.
pub fn terminate(&mut self) {
let (sender, receiver) = channel::bounded::<Event>(1);
let (sender, receiver) = channel::bounded::<()>(1);

let callback = Callback::new(move |_| {
sender.send(Event::Terminated).unwrap();
sender.send(()).unwrap();
});

self.sender.send(Event::Terminate(callback)).unwrap();
self.sender.send(HandleInput::Terminate(callback)).unwrap();

while let Ok(event) = receiver.recv() {
match event {
Event::Terminated => return,
_ => continue,
}
}
receiver.recv().unwrap()
}
}

0 comments on commit ebda75b

Please sign in to comment.