Skip to content

Commit

Permalink
Add compute node consensus frontend
Browse files Browse the repository at this point in the history
  • Loading branch information
kostko committed May 2, 2018
1 parent b9febe3 commit 3a5be89
Show file tree
Hide file tree
Showing 13 changed files with 659 additions and 182 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
* gRPC message types and conversion convention established.
* Registry interface / centralized implementation added.
* Make contract client sharable between threads.
* Use new consensus interface in the compute node.

# 0.1.0-alpha.4

Expand Down
2 changes: 1 addition & 1 deletion common/src/signature.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ impl Into<api::Signature> for Signature {
}

/// Signature from a committee node.
#[derive(Serialize, Deserialize)]
#[derive(Debug, Serialize, Deserialize)]
pub struct Signed<T> {
/// Signed value.
value: T,
Expand Down
4 changes: 3 additions & 1 deletion compute/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ ekiden-core = { path = "../core/common", version = "0.1.0-alpha.4" }
ekiden-untrusted = { path = "../core/untrusted", version = "0.1.0-alpha.4" }
ekiden-rpc-client = { path = "../rpc/client", version = "0.1.0-alpha.4" }
ekiden-compute-api = { path = "./api", version = "0.1.0-alpha.4" }
# TODO: Remove ekiden-consensus-api depndency once we use the new consensus API.
# TODO: Remove ekiden-consensus-api depndency once we use the new storage API.
ekiden-consensus-api = { path = "../consensus/api", version = "0.1.0-alpha.4" }
ekiden-consensus-base = { path = "../consensus/base", version = "0.1.0-alpha.4" }
ekiden-consensus-dummy = { path = "../consensus/dummy", version = "0.1.0-alpha.4" }
Expand All @@ -27,6 +27,8 @@ hyper = "0.11"
tokio-core = "0.1"
grpcio = "0.2.2"
lru-cache = "0.1.1"
log = "0.4"
pretty_env_logger = "0.2"

[build-dependencies]
ekiden-tools = { path = "../tools", version = "0.1.0-alpha.4" }
Expand Down
310 changes: 310 additions & 0 deletions compute/src/consensus.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,310 @@
//! Consensus frontend.
use std::collections::HashMap;
use std::collections::hash_map::Entry;
use std::sync::{Arc, Mutex};

use lru_cache::LruCache;

use ekiden_consensus_base::{Block, Commitment, ConsensusBackend, Event, Reveal, Transaction,
BLOCK_SUBMIT_SIGNATURE_CONTEXT};
use ekiden_core::bytes::{B256, H256};
use ekiden_core::contract::batch::{CallBatch, OutputBatch};
use ekiden_core::error::Error;
use ekiden_core::futures::{future, BoxFuture, Future, Stream};
use ekiden_core::futures::sync::{mpsc, oneshot};
use ekiden_core::hash::EncodedHash;
use ekiden_core::signature::{Signed, Signer};

/// Commands for communicating with the consensus frontend from other tasks.
enum Command {
/// Propose a new batch to consensus.
ProposeBatch(CallBatch, OutputBatch, H256),
}

/// Proposed block.
struct ProposedBlock {
/// Nonce used when generating commitment.
nonce: B256,
/// Proposed block we committed to.
block: Block,
}

struct ConsensusFrontendInner {
/// Consensus backend.
backend: Arc<ConsensusBackend + Send + Sync>,
/// Signer for the compute node.
signer: Arc<Signer + Send + Sync>,
/// Command sender.
command_sender: mpsc::UnboundedSender<Command>,
/// Command receiver (until initialized).
command_receiver: Mutex<Option<mpsc::UnboundedReceiver<Command>>>,
/// Currently proposed block.
proposed_block: Mutex<Option<ProposedBlock>>,
/// Recently computed outputs.
recent_outputs: Mutex<LruCache<H256, Vec<u8>>>,
/// Call subscribers (call id -> list of subscribers).
call_subscribers: Mutex<HashMap<H256, Vec<oneshot::Sender<Vec<u8>>>>>,
}

/// Consensus frontend configuration.
#[derive(Clone)]
pub struct ConsensusConfiguration {
/// Consensus backend.
pub backend: Arc<ConsensusBackend + Send + Sync>,
/// Signer for the compute node.
pub signer: Arc<Signer + Send + Sync>,
/// Maximum number of recently computed outputs to cache.
pub max_recent_outputs: usize,
}

/// Compute node consensus frontend.
pub struct ConsensusFrontend {
inner: Arc<ConsensusFrontendInner>,
}

impl ConsensusFrontend {
/// Create a new consensus frontend.
pub fn new(config: ConsensusConfiguration) -> Self {
let (command_sender, command_receiver) = mpsc::unbounded();

Self {
inner: Arc::new(ConsensusFrontendInner {
backend: config.backend,
signer: config.signer,
command_sender,
command_receiver: Mutex::new(Some(command_receiver)),
proposed_block: Mutex::new(None),
recent_outputs: Mutex::new(LruCache::new(config.max_recent_outputs)),
call_subscribers: Mutex::new(HashMap::new()),
}),
}
}

/// Start consensus frontend.
///
/// Returns list of futures that should be spawned in the event loop.
pub fn start(&self) -> Vec<BoxFuture<()>> {
// Subscribe to consensus events.
let event_processor: BoxFuture<()> = {
let shared_inner = self.inner.clone();

Box::new(
self.inner
.backend
.get_events()
.for_each(move |event| -> BoxFuture<()> {
match event {
Event::CommitmentsReceived => {
// Ensure we have proposed a block in the current round.
let proposed_block = {
let mut proposed_block =
shared_inner.proposed_block.lock().unwrap();

match proposed_block.take() {
Some(proposed_block) => proposed_block,
None => {
trace!(
"Ignoring commitments as we didn't propose any block"
);
return Box::new(future::ok(()));
}
}
};

info!("Submitting reveal and block");

// Generate and submit reveal.
let reveal = Reveal::new(
&shared_inner.signer,
&proposed_block.nonce,
&proposed_block.block.header,
);
let result = shared_inner.backend.reveal(reveal);

// If we are a leader, also submit the block.
// TODO: Only submit block if we are a leader.
let shared_inner = shared_inner.clone();
Box::new(result.and_then(move |_| {
// Sign and submit block.
let signed_block = Signed::sign(
&shared_inner.signer,
&BLOCK_SUBMIT_SIGNATURE_CONTEXT,
proposed_block.block,
);
shared_inner.backend.submit(signed_block)
}))
}
Event::RoundFailed(error) => {
error!("Round has failed: {:?}", error);

// TODO: How to handle this? Someone needs to retry?

// If the round has failed and we have proposed a block, be sure to clean
// up. Note that the transactions from the batch will still be stored in
// recent_outputs, but we don't emit anything if these are not persisted
// in a block.
{
let mut proposed_block = shared_inner.proposed_block.lock().unwrap();
drop(proposed_block.take());
}

Box::new(future::ok(()))
}
}
}),
)
};

// Subscribe to consensus blocks.
let block_processor: BoxFuture<()> = {
let shared_inner = self.inner.clone();

Box::new(
self.inner
.backend
.get_blocks()
.for_each(move |block| -> BoxFuture<()> {
info!(
"Received new block at round {:?} from consensus backend",
block.header.round
);

// Check if any subscribed transactions have been included in a block.
let mut call_subscribers = shared_inner.call_subscribers.lock().unwrap();
let mut recent_outputs = shared_inner.recent_outputs.lock().unwrap();
for transaction in &block.transactions {
let call_id = transaction.input.get_encoded_hash();
// We can only generate replies for outputs that we recently computed as outputs
// themselves are not included in blocks.
if let Some(output) = recent_outputs.get_mut(&call_id) {
if let Some(senders) = call_subscribers.remove(&call_id) {
for sender in senders {
// Explicitly ignore send errors as the receiver may have gone.
drop(sender.send(output.clone()));
}
}
}
}

Box::new(future::ok(()))
}),
)
};

// Receive proposed batches from worker.
let command_receiver = self.inner
.command_receiver
.lock()
.unwrap()
.take()
.expect("start already called");
let command_processor: BoxFuture<()> = {
let shared_inner = self.inner.clone();

Box::new(
command_receiver
.map_err(|_| Error::new("command channel closed"))
.for_each(move |command| -> BoxFuture<()> {
match command {
Command::ProposeBatch(calls, outputs, state_root) => {
// Fetch latest block.
let latest_block = shared_inner.backend.get_latest_block();
let shared_inner = shared_inner.clone();

Box::new(latest_block.and_then(move |child| -> BoxFuture<()> {
// Create block from result batches.
let mut block = Block::new_parent_of(&child);
// We currently just assume that the computation group is fixed.
// TODO: Get computation group from some backend.
block.computation_group = child.computation_group;
block.header.state_root = state_root;

// Generate a list of transactions from call/output batches.
{
let mut outputs = outputs;
let mut recent_outputs =
shared_inner.recent_outputs.lock().unwrap();
for (call, output) in calls.iter().zip(outputs.drain(..)) {
block.transactions.push(Transaction {
input: call.clone(),
output_hash: output.get_encoded_hash(),
});

recent_outputs.insert(call.get_encoded_hash(), output);
}
}

block.update();

info!(
"Proposing new block with {} transaction(s)",
block.transactions.len()
);

// Generate commitment.
let nonce = B256::random();
let commitment = Commitment::new(
&shared_inner.signer,
&nonce,
&block.header,
);

// Store proposed block.
{
let mut proposed_block =
shared_inner.proposed_block.lock().unwrap();

// Ensure no block has been proposed in the current round.
if proposed_block.is_some() {
// TODO: Build a new candidate batch for the next round.
// TODO: What if the computation group changes inbetween?
warn!("Dropping proposed batch received while a block has already been proposed");
return Box::new(future::ok(()));
}

proposed_block
.get_or_insert(ProposedBlock { nonce, block });
}

// Commit to block.
shared_inner.backend.commit(commitment)
}))
}
}
}),
)
};

vec![event_processor, block_processor, command_processor]
}

/// Propose a new batch to consensus.
///
/// This method should be called after a batch of calls has been computed and its outputs
/// are ready to be committed.
pub fn propose_batch(&self, calls: CallBatch, outputs: OutputBatch, state_root: H256) {
drop(
self.inner
.command_sender
.unbounded_send(Command::ProposeBatch(calls, outputs, state_root)),
);
}

/// Subscribe to being notified when specific call is included in a block.
pub fn subscribe_call(&self, call_id: H256) -> oneshot::Receiver<Vec<u8>> {
let (response_sender, response_receiver) = oneshot::channel();
{
let mut call_subscribers = self.inner.call_subscribers.lock().unwrap();
match call_subscribers.entry(call_id) {
Entry::Occupied(mut entry) => {
entry.get_mut().push(response_sender);
}
Entry::Vacant(entry) => {
entry.insert(vec![response_sender]);
}
}
}

response_receiver
}
}
Loading

0 comments on commit 3a5be89

Please sign in to comment.