From 1319c308ccea80637ee51843ff984ba7b9ddfa3f Mon Sep 17 00:00:00 2001 From: Jernej Kos Date: Mon, 30 Apr 2018 16:01:39 +0200 Subject: [PATCH] Add compute node consensus frontend --- CHANGELOG.md | 1 + common/Cargo.toml | 1 + common/src/futures.rs | 22 ++ common/src/signature.rs | 2 +- compute/Cargo.toml | 5 +- compute/src/consensus.rs | 464 +++++++++++++++++++++++++++++++ compute/src/main.rs | 110 ++++++-- compute/src/node.rs | 52 +++- compute/src/server.rs | 51 ++-- compute/src/worker.rs | 283 ++++++++----------- consensus/base/src/backend.rs | 8 +- consensus/dummy/Cargo.toml | 1 + consensus/dummy/src/backend.rs | 221 ++++++++++----- consensus/dummy/src/lib.rs | 3 + consensus/dummy/tests/backend.rs | 19 +- 15 files changed, 944 insertions(+), 299 deletions(-) create mode 100644 compute/src/consensus.rs diff --git a/CHANGELOG.md b/CHANGELOG.md index da88b496a00..316a3dfff80 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,7 @@ * gRPC message types and conversion convention established. * Registry interface / centralized implementation added. * Make contract client sharable between threads. +* Use new consensus interface in the compute node. # 0.1.0-alpha.4 diff --git a/common/Cargo.toml b/common/Cargo.toml index 65ba87be81d..6aa80cc181a 100644 --- a/common/Cargo.toml +++ b/common/Cargo.toml @@ -30,6 +30,7 @@ futures-sgx = { git = "https://github.com/ekiden/futures-rs" } [target.'cfg(not(target_env = "sgx"))'.dependencies] rand = "0.4.2" futures = "0.1" +futures-cpupool = "0.1" [dev-dependencies] serde_json = { git = "https://github.com/ekiden/json" } diff --git a/common/src/futures.rs b/common/src/futures.rs index 3c04564a039..a2562737398 100644 --- a/common/src/futures.rs +++ b/common/src/futures.rs @@ -1,7 +1,11 @@ //! Future types used in Ekiden. extern crate futures as extern_futures; +#[cfg(not(target_env = "sgx"))] +pub extern crate futures_cpupool as cpupool; pub use self::extern_futures::*; +#[cfg(not(target_env = "sgx"))] +use self::future::Executor as OldExecutor; use super::error::Error; @@ -10,3 +14,21 @@ pub type BoxFuture = Box + Send>; /// Stream type for use in Ekiden. pub type BoxStream = Box + Send>; + +/// A task executor. +/// +/// # Note +/// +/// Once we transition to futures 0.2+ this trait will no longer be needed as there +/// is already a similar trait there. +pub trait Executor { + /// Spawn the given task, polling it until completion. + fn spawn(&mut self, f: Box + Send>); +} + +#[cfg(not(target_env = "sgx"))] +impl Executor for cpupool::CpuPool { + fn spawn(&mut self, f: Box + Send>) { + self.execute(f).unwrap(); + } +} diff --git a/common/src/signature.rs b/common/src/signature.rs index 5c2add11a67..cce542922d4 100644 --- a/common/src/signature.rs +++ b/common/src/signature.rs @@ -209,7 +209,7 @@ impl Into for Signature { } /// Signature from a committee node. -#[derive(Serialize, Deserialize)] +#[derive(Debug, Serialize, Deserialize)] pub struct Signed { /// Signed value. value: T, diff --git a/compute/Cargo.toml b/compute/Cargo.toml index 37812deb89e..93d05906600 100644 --- a/compute/Cargo.toml +++ b/compute/Cargo.toml @@ -12,7 +12,7 @@ ekiden-core = { path = "../core/common", version = "0.1.0-alpha.4" } ekiden-untrusted = { path = "../core/untrusted", version = "0.1.0-alpha.4" } ekiden-rpc-client = { path = "../rpc/client", version = "0.1.0-alpha.4" } ekiden-compute-api = { path = "./api", version = "0.1.0-alpha.4" } -# TODO: Remove ekiden-consensus-api depndency once we use the new consensus API. +# TODO: Remove ekiden-consensus-api depndency once we use the new storage API. ekiden-consensus-api = { path = "../consensus/api", version = "0.1.0-alpha.4" } ekiden-consensus-base = { path = "../consensus/base", version = "0.1.0-alpha.4" } ekiden-consensus-dummy = { path = "../consensus/dummy", version = "0.1.0-alpha.4" } @@ -27,6 +27,9 @@ hyper = "0.11" tokio-core = "0.1" grpcio = "0.2.2" lru-cache = "0.1.1" +log = "0.4" +pretty_env_logger = "0.2" +futures-timer = "0.1.1" [build-dependencies] ekiden-tools = { path = "../tools", version = "0.1.0-alpha.4" } diff --git a/compute/src/consensus.rs b/compute/src/consensus.rs new file mode 100644 index 00000000000..748f4e08869 --- /dev/null +++ b/compute/src/consensus.rs @@ -0,0 +1,464 @@ +//! Consensus frontend. +use std::collections::HashMap; +use std::collections::hash_map::Entry; +use std::sync::{Arc, Mutex}; +use std::sync::atomic::AtomicBool; +use std::sync::atomic::Ordering::SeqCst; +use std::time::{Duration, Instant}; + +use futures_timer::Interval; +use lru_cache::LruCache; + +use ekiden_consensus_base::{Block, Commitment, ConsensusBackend, Event, Reveal, Transaction, + BLOCK_SUBMIT_SIGNATURE_CONTEXT}; +use ekiden_core::bytes::{B256, H256}; +use ekiden_core::contract::batch::CallBatch; +use ekiden_core::error::{Error, Result}; +use ekiden_core::futures::{future, BoxFuture, Executor, Future, Stream}; +use ekiden_core::futures::sync::{mpsc, oneshot}; +use ekiden_core::hash::EncodedHash; +use ekiden_core::signature::{Signed, Signer}; + +use super::worker::{ComputedBatch, Worker}; + +/// Commands for communicating with the consensus frontend from other tasks. +enum Command { + /// Append to current batch. + AppendBatch(CallBatch), +} + +/// Proposed block. +struct ProposedBlock { + /// Nonce used when generating commitment. + nonce: B256, + /// Proposed block we committed to. + block: Block, +} + +/// Call batch that is being constructed. +struct PendingBatch { + /// Instant when first item was queued in the batch. + start: Instant, + /// Batch of contract calls. + calls: CallBatch, +} + +impl Default for PendingBatch { + fn default() -> Self { + Self { + start: Instant::now(), + calls: CallBatch::default(), + } + } +} + +struct ConsensusFrontendInner { + /// Consensus backend. + backend: Arc, + /// Signer for the compute node. + signer: Arc, + /// Worker that can process batches. + worker: Arc, + /// Command sender. + command_sender: mpsc::UnboundedSender, + /// Command receiver (until initialized). + command_receiver: Mutex>>, + /// Current batch. + current_batch: Mutex>, + /// Maximum batch size. + max_batch_size: usize, + /// Maximum batch timeout. + max_batch_timeout: Duration, + /// Flag if a batch is currently processing. + batch_processing: AtomicBool, + /// Currently proposed block. + proposed_block: Mutex>, + /// Recently computed outputs. + recent_outputs: Mutex>>, + /// Call subscribers (call id -> list of subscribers). + call_subscribers: Mutex>>>>, +} + +/// Consensus frontend configuration. +#[derive(Clone)] +pub struct ConsensusConfiguration { + /// Consensus backend. + pub backend: Arc, + /// Signer for the compute node. + pub signer: Arc, + /// Maximum batch size. + pub max_batch_size: usize, + /// Maximum batch timeout. + pub max_batch_timeout: u64, +} + +/// Compute node consensus frontend. +pub struct ConsensusFrontend { + inner: Arc, +} + +impl ConsensusFrontend { + /// Create a new consensus frontend. + pub fn new(config: ConsensusConfiguration, worker: Arc) -> Self { + let (command_sender, command_receiver) = mpsc::unbounded(); + + Self { + inner: Arc::new(ConsensusFrontendInner { + backend: config.backend, + signer: config.signer, + worker, + command_sender, + command_receiver: Mutex::new(Some(command_receiver)), + current_batch: Mutex::new(None), + max_batch_size: config.max_batch_size, + max_batch_timeout: Duration::from_millis(config.max_batch_timeout), + batch_processing: AtomicBool::new(false), + proposed_block: Mutex::new(None), + recent_outputs: Mutex::new(LruCache::new(config.max_batch_size * 10)), + call_subscribers: Mutex::new(HashMap::new()), + }), + } + } + + /// Start consensus frontend. + pub fn start(&self, executor: &mut Executor) { + // Subscribe to consensus events. + executor.spawn({ + let inner = self.inner.clone(); + + Box::new( + self.inner + .backend + .get_events() + .for_each(move |event| match event { + Event::CommitmentsReceived => { + Self::handle_commitments_received(inner.clone()) + } + Event::RoundFailed(error) => { + Self::handle_round_failed(inner.clone(), error) + } + }) + .then(|_| future::ok(())), + ) + }); + + // Subscribe to consensus blocks. + executor.spawn({ + let inner = self.inner.clone(); + + Box::new( + self.inner + .backend + .get_blocks() + .for_each(move |block| Self::handle_block(inner.clone(), block)) + .then(|_| future::ok(())), + ) + }); + + // Receive proposed batches from worker. + let command_receiver = self.inner + .command_receiver + .lock() + .unwrap() + .take() + .expect("start already called"); + executor.spawn({ + let inner = self.inner.clone(); + + Box::new( + command_receiver + .map_err(|_| Error::new("command channel closed")) + .for_each(move |command| match command { + Command::AppendBatch(calls) => { + Self::handle_append_batch(inner.clone(), calls) + } + }) + .then(|_| future::ok(())), + ) + }); + + // Periodically check for batches. + executor.spawn({ + let inner = self.inner.clone(); + + Box::new( + Interval::new(self.inner.max_batch_timeout) + .map_err(|error| Error::from(error)) + .for_each(move |_| { + // Check if batch is ready to be sent for processing. + Self::check_and_process_current_batch(inner.clone()) + }) + .then(|_| future::ok(())), + ) + }); + } + + /// Handle append batch command. + fn handle_append_batch( + inner: Arc, + mut calls: CallBatch, + ) -> BoxFuture<()> { + // Ignore empty batches. + if calls.is_empty() { + return Box::new(future::ok(())); + } + + // Append to batch. + { + let mut current_batch = inner.current_batch.lock().unwrap(); + let current_batch = current_batch.get_or_insert_with(|| PendingBatch::default()); + current_batch.calls.append(&mut calls); + } + + // Check if batch is ready to be sent for processing. + Self::check_and_process_current_batch(inner.clone()) + } + + /// Handle commitments received event from consensus backend. + fn handle_commitments_received(inner: Arc) -> BoxFuture<()> { + // Ensure we have proposed a block in the current round. + let proposed_block_guard = inner.proposed_block.lock().unwrap(); + if proposed_block_guard.is_none() { + trace!("Ignoring commitments as we didn't propose any block"); + return Box::new(future::ok(())); + } + + let proposed_block = proposed_block_guard.as_ref().unwrap(); + + info!("Submitting reveal and block"); + + // Generate and submit reveal. + let reveal = Reveal::new( + &inner.signer, + &proposed_block.nonce, + &proposed_block.block.header, + ); + let result = inner.backend.reveal(reveal); + + // If we are a leader, also submit the block. + // TODO: Only submit block if we are a leader. + let block = proposed_block.block.clone(); + let inner = inner.clone(); + Box::new(result.and_then(move |_| { + // Sign and submit block. + let signed_block = Signed::sign(&inner.signer, &BLOCK_SUBMIT_SIGNATURE_CONTEXT, block); + inner.backend.submit(signed_block) + })) + } + + /// Handle round failed event from consensus backend. + fn handle_round_failed(inner: Arc, error: Error) -> BoxFuture<()> { + error!("Round has failed: {:?}", error); + + // TODO: Should we move all failed calls back into the current batch? + + // If the round has failed and we have proposed a block, be sure to clean + // up. Note that the transactions from the batch will still be stored in + // recent_outputs, but we don't emit anything if these are not persisted + // in a block. + { + let mut proposed_block = inner.proposed_block.lock().unwrap(); + drop(proposed_block.take()); + } + + // Clear batch processing flag. + inner.batch_processing.store(false, SeqCst); + + Box::new(future::ok(())) + } + + /// Handle new block from consensus backend. + fn handle_block(inner: Arc, block: Block) -> BoxFuture<()> { + info!( + "Received new block at round {:?} from consensus backend", + block.header.round + ); + + // Check if this is a block for the same round that we proposed. + { + let mut proposed_block = inner.proposed_block.lock().unwrap(); + let should_clear = { + if let Some(ref proposed_block) = *proposed_block { + proposed_block.block.header.round >= block.header.round + } else { + false + } + }; + + if should_clear { + info!("Block is for the same round or newer as recently proposed block"); + + // Clear proposed block. + proposed_block.take(); + // Clear batch processing flag. + inner.batch_processing.store(false, SeqCst); + } + } + + // Check if any subscribed transactions have been included in a block. + let mut call_subscribers = inner.call_subscribers.lock().unwrap(); + let mut recent_outputs = inner.recent_outputs.lock().unwrap(); + for transaction in &block.transactions { + let call_id = transaction.input.get_encoded_hash(); + // We can only generate replies for outputs that we recently computed as outputs + // themselves are not included in blocks. + if let Some(output) = recent_outputs.get_mut(&call_id) { + if let Some(senders) = call_subscribers.remove(&call_id) { + for sender in senders { + // Explicitly ignore send errors as the receiver may have gone. + drop(sender.send(output.clone())); + } + } + } + } + + Box::new(future::ok(())) + } + + /// Check if we need to send the current batch for processing. + /// + /// The batch is then sent for processing if either: + /// * Number of calls it contains reaches `max_batch_size`. + /// * More than `max_batch_timeout` time elapsed since batch was created. + /// * No other batch is currently processing. + fn check_and_process_current_batch(inner: Arc) -> BoxFuture<()> { + // First check if a batch is already being processed. + if inner.batch_processing.load(SeqCst) { + return Box::new(future::ok(())); + } + + // No batch yet, check if we should process. + let mut current_batch = inner.current_batch.lock().unwrap(); + let should_process = if let Some(ref current_batch) = *current_batch { + current_batch.calls.len() >= inner.max_batch_size + || current_batch.start.elapsed() >= inner.max_batch_timeout + } else { + false + }; + + if should_process { + // We have decided to process the current batch. + inner.batch_processing.store(true, SeqCst); + + // Take calls from current batch for processing. We only take up to max_batch_size + // and leave the rest for the next batch, resetting the timestamp. + let mut calls = current_batch.take().unwrap().calls; + if calls.len() > inner.max_batch_size { + let mut remaining = calls.split_off(inner.max_batch_size); + let current_batch = current_batch.get_or_insert_with(|| PendingBatch::default()); + current_batch.calls.append(&mut remaining); + } + + // Fetch the latest block and request the worker to process the batch. + let inner = inner.clone(); + Box::new(inner.backend.get_latest_block().and_then(move |block| { + // Send block and channel to worker. + let process_batch = inner.worker.contract_call_batch(calls, block); + + // After the batch is processed, propose the batch. + process_batch + .map_err(|_| Error::new("channel closed")) + .and_then(move |result| Self::propose_batch(inner, result)) + })) + } else { + Box::new(future::ok(())) + } + } + + /// Propose a batch to consensus backend. + fn propose_batch( + inner: Arc, + computed_batch: Result, + ) -> BoxFuture<()> { + // Check result of batch computation. + let mut computed_batch = match computed_batch { + Ok(computed_batch) => computed_batch, + Err(error) => { + error!("Failed to process batch: {}", error.message); + // TODO: Should we move all failed calls back into the current batch? + + // Clear batch processing flag. + inner.batch_processing.store(false, SeqCst); + return Box::new(future::ok(())); + } + }; + + // Create block from result batches. + let mut block = Block::new_parent_of(&computed_batch.block); + // We currently just assume that the computation group is fixed. + // TODO: Get computation group from some backend. + block.computation_group = computed_batch.block.computation_group; + block.header.state_root = computed_batch.new_state_root; + + // Generate a list of transactions from call/output batches. + { + let mut recent_outputs = inner.recent_outputs.lock().unwrap(); + for (call, output) in computed_batch + .calls + .iter() + .zip(computed_batch.outputs.drain(..)) + { + block.transactions.push(Transaction { + input: call.clone(), + output_hash: output.get_encoded_hash(), + }); + + recent_outputs.insert(call.get_encoded_hash(), output); + } + } + + block.update(); + + info!( + "Proposing new block with {} transaction(s)", + block.transactions.len() + ); + + // Generate commitment. + let nonce = B256::random(); + let commitment = Commitment::new(&inner.signer, &nonce, &block.header); + + // Store proposed block. + { + let mut proposed_block = inner.proposed_block.lock().unwrap(); + + // Ensure no block was previously proposed. This should never happen as we always + // check the batch_processing flag before processing a batch. + assert!( + proposed_block.is_none(), + "tried to overwrite proposed block" + ); + + proposed_block.get_or_insert(ProposedBlock { nonce, block }); + } + + // Commit to block. + inner.backend.commit(commitment) + } + + /// Append contract calls to current batch for eventual processing. + pub fn append_batch(&self, calls: CallBatch) { + self.inner + .command_sender + .unbounded_send(Command::AppendBatch(calls)) + .unwrap(); + } + + /// Subscribe to being notified when specific call is included in a block. + pub fn subscribe_call(&self, call_id: H256) -> oneshot::Receiver> { + let (response_sender, response_receiver) = oneshot::channel(); + { + let mut call_subscribers = self.inner.call_subscribers.lock().unwrap(); + match call_subscribers.entry(call_id) { + Entry::Occupied(mut entry) => { + entry.get_mut().push(response_sender); + } + Entry::Vacant(entry) => { + entry.insert(vec![response_sender]); + } + } + } + + response_receiver + } +} diff --git a/compute/src/main.rs b/compute/src/main.rs index 472852b0063..d918b137599 100644 --- a/compute/src/main.rs +++ b/compute/src/main.rs @@ -3,16 +3,18 @@ extern crate sgx_types; extern crate base64; +extern crate futures_timer; extern crate grpcio; +extern crate hyper; +#[macro_use] +extern crate log; extern crate lru_cache; +#[macro_use] +extern crate prometheus; extern crate protobuf; extern crate reqwest; extern crate thread_local; -extern crate hyper; -#[macro_use] -extern crate prometheus; - extern crate ekiden_compute_api; extern crate ekiden_consensus_api; extern crate ekiden_consensus_base; @@ -26,17 +28,32 @@ mod handlers; mod server; mod worker; mod node; +mod consensus; + +// Everything above should be moved into a library, while everything below should be in the binary. #[macro_use] extern crate clap; +extern crate pretty_env_logger; extern crate ekiden_consensus_dummy; +use std::fs::File; +use std::io::{Read, Write}; use std::path::Path; +use std::sync::Arc; use std::thread; use clap::{App, Arg}; +use log::LevelFilter; + +use ekiden_consensus_base::{CommitteeNode, Role}; +use ekiden_core::ring::rand::SystemRandom; +use ekiden_core::ring::signature::Ed25519KeyPair; +use ekiden_core::signature::{InMemorySigner, Signer}; +use ekiden_core::untrusted; +use self::consensus::ConsensusConfiguration; use self::ias::{IASConfiguration, SPID}; use self::node::{ComputeNode, ComputeNodeConfiguration}; use self::worker::{KeyManagerConfiguration, WorkerConfiguration}; @@ -91,12 +108,6 @@ fn main() { .takes_value(true) .default_value("9003"), ) - .arg( - Arg::with_name("consensus-backend") - .long("consensus-backend") - .takes_value(true) - .default_value("dummy") - ) .arg( Arg::with_name("consensus-host") .long("consensus-host") @@ -113,15 +124,15 @@ fn main() { .arg( Arg::with_name("grpc-threads") .long("grpc-threads") - .help("Number of threads to use in the GRPC server's HTTP server. Multiple threads only allow requests to be batched up. Requests will not be processed concurrently.") - .default_value("1") + .help("Number of threads to use for the event loop.") + .default_value("4") .takes_value(true), ) .arg( Arg::with_name("metrics-addr") .long("metrics-addr") .help("A SocketAddr (as a string) from which to serve metrics to Prometheus.") - .takes_value(true) + .takes_value(true), ) .arg( Arg::with_name("max-batch-size") @@ -147,19 +158,79 @@ fn main() { .arg( Arg::with_name("no-persist-identity") .long("no-persist-identity") - .help("Do not persist enclave identity (useful for contract development)") + .help("Do not persist enclave identity (useful for contract development)"), + ) + .arg( + Arg::with_name("key-pair") + .long("key-pair") + .help("Path to key pair for this compute node (if not set, a new key pair will be generated)") + .takes_value(true) ) .get_matches(); + // Initialize logger. + pretty_env_logger::formatted_builder() + .unwrap() + .filter(None, LevelFilter::Trace) + .init(); + + // Setup key pair. + let mut key_pair = if let Some(filename) = matches.value_of("key-pair") { + // Load key pair from existing file. + if let Ok(mut file) = File::open(filename) { + let mut key_pair = vec![]; + file.read_to_end(&mut key_pair).unwrap(); + info!("Loaded node key pair from {}", filename); + + Some(key_pair) + } else { + None + } + } else { + None + }; + + if key_pair.is_none() { + // Generate new key pair. + info!("Generating new key pair"); + let rng = SystemRandom::new(); + let new_key_pair = Ed25519KeyPair::generate_pkcs8(&rng).unwrap().to_vec(); + + if let Some(filename) = matches.value_of("key-pair") { + // Persist key pair to file. + let mut file = File::create(filename).expect("unable to create key pair file"); + file.write(&new_key_pair).unwrap(); + } + + key_pair = Some(new_key_pair); + } + + let key_pair = Ed25519KeyPair::from_pkcs8(untrusted::Input::from(&key_pair.unwrap())).unwrap(); + let signer = Arc::new(InMemorySigner::new(key_pair)); + + info!("Using public key {:?}", signer.get_public_key()); + // Setup consensus backend. + // TODO: Get backend configuration from command line or configuration file. // TODO: Change dummy backend to get computation group from another backend. - let consensus_backend = Box::new(ekiden_consensus_dummy::DummyConsensusBackend::new(vec![])); + let consensus_backend = Arc::new(ekiden_consensus_dummy::DummyConsensusBackend::new(vec![ + CommitteeNode { + role: Role::Leader, + public_key: signer.get_public_key(), + }, + ])); // Setup compute node. let mut node = ComputeNode::new(ComputeNodeConfiguration { - grpc_threads: value_t!(matches, "grpc-threads", usize).unwrap_or(1), + grpc_threads: value_t!(matches, "grpc-threads", usize).unwrap_or_else(|e| e.exit()), port: value_t!(matches, "port", u16).unwrap_or(9001), - consensus_backend, + // Consensus configuration. + consensus: ConsensusConfiguration { + backend: consensus_backend, + signer: signer, + max_batch_size: value_t!(matches, "max-batch-size", usize).unwrap_or(1000), + max_batch_timeout: value_t!(matches, "max-batch-timeout", u64).unwrap_or(1000), + }, // IAS configuration. ias: if matches.is_present("ias-spid") { Some(IASConfiguration { @@ -167,7 +238,7 @@ fn main() { pkcs12_archive: matches.value_of("ias-pkcs12").unwrap().to_string(), }) } else { - eprintln!("WARNING: IAS is not configured, validation will always return an error."); + warn!("IAS is not configured, validation will always return an error."); None }, @@ -185,8 +256,7 @@ fn main() { consensus_host: matches.value_of("consensus-host").unwrap().to_owned(), // TODO: Remove this after we switch to new storage backend. consensus_port: value_t!(matches, "consensus-port", u16).unwrap_or(9002), - max_batch_size: value_t!(matches, "max-batch-size", usize).unwrap_or(1000), - max_batch_timeout: value_t!(matches, "max-batch-timeout", u64).unwrap_or(1000), + saved_identity_path: if matches.is_present("no-persist-identity") { None } else { diff --git a/compute/src/node.rs b/compute/src/node.rs index 2d6213f7639..223d349ca54 100644 --- a/compute/src/node.rs +++ b/compute/src/node.rs @@ -1,3 +1,4 @@ +//! Compute node. use std::sync::Arc; use grpcio; @@ -5,19 +6,40 @@ use grpcio; use ekiden_compute_api::create_compute; use ekiden_consensus_base::ConsensusBackend; use ekiden_core::error::Result; +use ekiden_core::futures::{Executor, Future}; +use super::consensus::{ConsensusConfiguration, ConsensusFrontend}; use super::ias::{IASConfiguration, IAS}; use super::server::ComputeService; use super::worker::{Worker, WorkerConfiguration}; +/// Executor that uses the gRPC environment for execution. +struct GrpcExecutor(grpcio::Client); + +impl GrpcExecutor { + fn new(environment: Arc) -> Self { + GrpcExecutor( + // Create a dummy channel, needed for executing futures. This is required because + // the API for doing this directly using an Executor is not exposed. + grpcio::Client::new(grpcio::ChannelBuilder::new(environment).connect("")), + ) + } +} + +impl Executor for GrpcExecutor { + fn spawn(&mut self, f: Box + Send>) { + self.0.spawn(f); + } +} + /// Compute node configuration. pub struct ComputeNodeConfiguration { /// Number of gRPC threads. pub grpc_threads: usize, /// gRPC server port. pub port: u16, - /// Consensus backend. - pub consensus_backend: Box, + /// Consensus configuration. + pub consensus: ConsensusConfiguration, /// IAS configuration. pub ias: Option, /// Worker configuration. @@ -27,9 +49,13 @@ pub struct ComputeNodeConfiguration { /// Compute node. pub struct ComputeNode { /// Consensus backend. - consensus_backend: Box, + consensus_backend: Arc, + /// Consensus frontend. + consensus_frontend: Arc, /// gRPC server. server: grpcio::Server, + /// Futures executor used by this compute node. + executor: GrpcExecutor, } impl ComputeNode { @@ -44,26 +70,38 @@ impl ComputeNode { // Create worker. let worker = Arc::new(Worker::new(config.worker, grpc_environment.clone(), ias)); + // Create consensus frontend. + let consensus_backend = config.consensus.backend.clone(); + let consensus_frontend = Arc::new(ConsensusFrontend::new(config.consensus, worker.clone())); + // Create compute node gRPC server. - let service = create_compute(ComputeService::new(worker.clone())); + let service = create_compute(ComputeService::new(worker, consensus_frontend.clone())); let server = grpcio::ServerBuilder::new(grpc_environment.clone()) .register_service(service) .bind("0.0.0.0", config.port) .build()?; Ok(Self { - consensus_backend: config.consensus_backend, + consensus_backend, + consensus_frontend, server, + executor: GrpcExecutor::new(grpc_environment), }) } /// Start compute node. pub fn start(&mut self) { + // Start consensus backend tasks. + self.consensus_backend.start(&mut self.executor); + + // Start consensus frontend tasks. + self.consensus_frontend.start(&mut self.executor); + + // Start gRPC server. self.server.start(); for &(ref host, port) in self.server.bind_addrs() { - // TODO: Use proper logging. - println!("Compute node listening on {}:{}", host, port); + info!("Compute node listening on {}:{}", host, port); } } } diff --git a/compute/src/server.rs b/compute/src/server.rs index 52d3a697a14..22c5c411b28 100644 --- a/compute/src/server.rs +++ b/compute/src/server.rs @@ -1,25 +1,23 @@ use std::error::Error; use std::sync::Arc; -use std::sync::mpsc::Sender; use grpcio; use grpcio::{RpcStatus, RpcStatusCode}; -use thread_local::ThreadLocal; use ekiden_compute_api::{CallContractRequest, CallContractResponse, Compute, WaitContractCallRequest, WaitContractCallResponse}; use ekiden_core::bytes::H256; use ekiden_core::futures::Future; -use ekiden_core::futures::sync::oneshot; +use super::consensus::ConsensusFrontend; use super::instrumentation; -use super::worker::{Command, Worker}; +use super::worker::Worker; struct ComputeServiceInner { /// Worker. worker: Arc, - /// Thread-local channel for submitting commands to the worker. - tl_command_sender: ThreadLocal>, + /// Consensus frontend. + consensus_frontend: Arc, /// Instrumentation objects. ins: instrumentation::HandlerMetrics, } @@ -31,22 +29,15 @@ pub struct ComputeService { impl ComputeService { /// Create new compute server instance. - pub fn new(worker: Arc) -> Self { + pub fn new(worker: Arc, consensus_frontend: Arc) -> Self { ComputeService { inner: Arc::new(ComputeServiceInner { worker, - tl_command_sender: ThreadLocal::new(), + consensus_frontend, ins: instrumentation::HandlerMetrics::new(), }), } } - - /// Get thread-local command sender. - fn get_command_sender(&self) -> &Sender { - self.inner - .tl_command_sender - .get_or(|| Box::new(self.inner.worker.get_command_sender())) - } } impl Compute for ComputeService { @@ -60,14 +51,12 @@ impl Compute for ComputeService { self.inner.ins.reqs_received.inc(); let _client_timer = self.inner.ins.req_time_client.start_timer(); - // Send command to worker thread. - let (response_sender, response_receiver) = oneshot::channel(); - self.get_command_sender() - .send(Command::RpcCall( - rpc_request.take_payload(), - response_sender, - )) - .unwrap(); + // Send command to worker thread and request any generated batches to be handled + // by our consensus frontend. + let response_receiver = self.inner.worker.rpc_call( + rpc_request.take_payload(), + self.inner.consensus_frontend.clone(), + ); // Prepare response future. let f = response_receiver.then(|result| match result { @@ -105,26 +94,18 @@ impl Compute for ComputeService { } // Send command to worker thread. - let (response_sender, response_receiver) = oneshot::channel(); - self.get_command_sender() - .send(Command::SubscribeCall( - H256::from(request.get_call_id()), - response_sender, - )) - .unwrap(); + let response_receiver = self.inner + .consensus_frontend + .subscribe_call(H256::from(request.get_call_id())); // Prepare response future. let f = response_receiver.then(|result| match result { - Ok(Ok(response)) => { + Ok(response) => { let mut rpc_response = WaitContractCallResponse::new(); rpc_response.set_output(response); sink.success(rpc_response) } - Ok(Err(error)) => sink.fail(RpcStatus::new( - RpcStatusCode::Internal, - Some(error.description().to_owned()), - )), Err(error) => sink.fail(RpcStatus::new( RpcStatusCode::Internal, Some(error.description().to_owned()), diff --git a/compute/src/worker.rs b/compute/src/worker.rs index 2a5bba613a1..52f56898409 100644 --- a/compute/src/worker.rs +++ b/compute/src/worker.rs @@ -1,33 +1,30 @@ -use std; use std::borrow::Borrow; -use std::collections::HashMap; -use std::collections::hash_map::Entry; use std::fmt::Write; use std::ops::Deref; use std::path::PathBuf; use std::sync::{Arc, Mutex}; use std::sync::mpsc::{channel, Receiver, Sender}; use std::thread; -use std::time::{Duration, Instant}; use grpcio; -use lru_cache::LruCache; use protobuf; use protobuf::Message; +use thread_local::ThreadLocal; use ekiden_consensus_api::{self, ConsensusClient}; +use ekiden_consensus_base::Block; use ekiden_core::bytes::H256; use ekiden_core::contract::batch::{CallBatch, OutputBatch}; use ekiden_core::enclave::api::IdentityProof; use ekiden_core::enclave::quote; use ekiden_core::error::{Error, Result}; use ekiden_core::futures::sync::oneshot; -use ekiden_core::hash::EncodedHash; use ekiden_core::rpc::api; use ekiden_core::rpc::client::ClientEndpoint; use ekiden_untrusted::{Enclave, EnclaveContract, EnclaveDb, EnclaveIdentity, EnclaveRpc}; use ekiden_untrusted::rpc::router::RpcRouter; +use super::consensus::ConsensusFrontend; use super::handlers; use super::ias::IAS; use super::instrumentation; @@ -37,28 +34,28 @@ pub type BytesResult = Result>; /// Result bytes sender part of the channel. pub type BytesSender = oneshot::Sender; -/// Call batch that is being constructed. +/// Computed batch. #[derive(Debug)] -pub struct PendingBatch { - /// Instant when first item was queued in the batch. - start: Instant, - /// Call batch. - batch: CallBatch, +pub struct ComputedBatch { + /// Block this batch was computed against. + pub block: Block, + /// Batch of contract calls. + pub calls: CallBatch, + /// Batch of contract outputs. + pub outputs: OutputBatch, + /// New state root hash. + pub new_state_root: H256, } /// Command sent to the worker thread. -#[derive(Debug)] -pub enum Command { +enum Command { /// RPC call from a client. - RpcCall(Vec, BytesSender), + RpcCall(Vec, BytesSender, Arc), /// Contract call batch process request. - ContractCallBatch(CallBatch), - /// Contract call subscription request. - SubscribeCall(H256, BytesSender), - /// Ping worker. - Ping, + ContractCallBatch(CallBatch, Block, oneshot::Sender>), } +// TODO: Remove once we start using the new storage backend. struct CachedStateInitialized { encrypted_state: Vec, height: u64, @@ -66,6 +63,7 @@ struct CachedStateInitialized { struct WorkerInner { /// Consensus client. + // TODO: Remove once we start using the new storage backend. consensus: Option, /// Contract running in an enclave. contract: Enclave, @@ -74,21 +72,10 @@ struct WorkerInner { identity_proof: IdentityProof, /// Cached state reconstituted from checkpoint and diffs. None if /// cache or state is uninitialized. + // TODO: Remove once we start using the new storage backend. cached_state: Option, /// Instrumentation objects. ins: instrumentation::WorkerMetrics, - /// Maximum batch size. - max_batch_size: usize, - /// Maximum batch timeout. - max_batch_timeout: Duration, - /// Current batch. - current_batch: Option, - /// Batch call subscriptions. - subscriptions_call: HashMap>, - /// Processed calls without subscriptions. We keep a LRU cache of such call results - /// around so that subscription requests can arrive even after the batch has been - /// processed. - missed_calls: LruCache, } impl WorkerInner { @@ -97,6 +84,7 @@ impl WorkerInner { Self::create_contract(&config.contract_filename, ias, config.saved_identity_path); // Construct consensus client. + // TODO: Remove once we start using the new storage backend. let consensus = match config.consensus_host.as_ref() { "none" => None, consensus_host => { @@ -110,14 +98,11 @@ impl WorkerInner { Self { contract, identity_proof, + // TODO: Remove once we start using the new storage backend. cached_state: None, ins: instrumentation::WorkerMetrics::new(), - max_batch_size: config.max_batch_size, - max_batch_timeout: Duration::from_millis(config.max_batch_timeout), - consensus: consensus, - current_batch: None, - subscriptions_call: HashMap::new(), - missed_calls: LruCache::new(config.max_batch_size * 2), + // TODO: Remove once we start using the new storage backend. + consensus, } } @@ -145,11 +130,12 @@ impl WorkerInner { write!(&mut mr_enclave, "{:02x}", byte).unwrap(); } - println!("Loaded contract with MRENCLAVE: {}", mr_enclave); + info!("Loaded contract with MRENCLAVE: {}", mr_enclave); (contract, identity_proof) } + // TODO: Remove once we start using the new storage backend. #[cfg(not(feature = "no_cache"))] fn get_cached_state_height(&self) -> Option { match self.cached_state.as_ref() { @@ -158,6 +144,7 @@ impl WorkerInner { } } + // TODO: Remove once we start using the new storage backend. fn set_cached_state(&mut self, checkpoint: &ekiden_consensus_api::Checkpoint) -> Result<()> { self.cached_state = Some(CachedStateInitialized { encrypted_state: checkpoint.get_payload().to_vec(), @@ -166,6 +153,7 @@ impl WorkerInner { Ok(()) } + // TODO: Remove once we start using the new storage backend. fn advance_cached_state(&mut self, diffs: &[Vec]) -> Result> { #[cfg(feature = "no_diffs")] assert!( @@ -187,6 +175,7 @@ impl WorkerInner { fn call_contract_batch_fallible(&mut self, batch: &CallBatch) -> Result { // Get state updates from consensus + // TODO: Remove once we start using the new storage backend. let encrypted_state_opt = if self.consensus.is_some() { let _consensus_get_timer = self.ins.consensus_get_time.start_timer(); @@ -228,6 +217,7 @@ impl WorkerInner { None }; + // TODO: Remove once we start using the new storage backend. #[cfg(not(feature = "no_diffs"))] let orig_encrypted_state_opt = encrypted_state_opt.clone(); #[cfg(feature = "no_diffs")] @@ -243,6 +233,7 @@ impl WorkerInner { self.contract.contract_call_batch(batch) }?; + // TODO: Remove once we start using the new storage backend. // Check if any state was produced. In case no state was produced, this means that // no request caused a state update and thus no state update is required. let encrypted_state = self.contract.db_state_get()?; @@ -297,148 +288,74 @@ impl WorkerInner { } /// Handle contract call batch. - fn handle_contract_batch(&mut self, batch: CallBatch) { - let outputs = self.call_contract_batch_fallible(&batch); + fn handle_contract_batch( + &mut self, + calls: CallBatch, + block: Block, + sender: oneshot::Sender>, + ) { + // TODO: Use block to get the state root hash for storage. + let outputs = self.call_contract_batch_fallible(&calls); match outputs { - Ok(mut outputs) => { - // No errors, send per-call outputs. - for (output, call) in outputs.drain(..).zip(batch.iter()) { - let call_id = call.get_encoded_hash(); - if let Some(senders) = self.subscriptions_call.remove(&call_id) { - for sender in senders { - // Explicitly ignore send errors as the receiver may have gone. - drop(sender.send(Ok(output.clone()))); - } - } - - self.missed_calls.insert(call_id, Ok(output)); - } + Ok(outputs) => { + // No errors, hand over the batch to consensus. + // TODO: Use actual state root hash. + let new_state_root = H256::zero(); + sender + .send(Ok(ComputedBatch { + block, + calls, + outputs, + new_state_root, + })) + .unwrap(); } Err(error) => { // Batch-wide error has occurred. - eprintln!("batch-wide error: {:?}", error); - - for call in batch.iter() { - let call_id = call.get_encoded_hash(); - if let Some(senders) = self.subscriptions_call.remove(&call_id) { - for sender in senders { - // Explicitly ignore send errors as the receiver may have gone. - drop(sender.send(Err(error.clone()))); - } - } - - self.missed_calls.insert(call_id, Err(error.clone())); - } + error!("Batch-wide error: {:?}", error); + sender.send(Err(error)).unwrap(); } } } /// Check if the most recent RPC call produced any contract calls and queue them /// in the current call batch. - fn queue_contract_batch(&mut self) { + fn check_and_append_contract_batch(&self, consensus_frontend: Arc) { // Check if the most recent RPC call produced any contract calls. - let mut batch = self.contract.contract_take_batch().unwrap(); - if batch.is_empty() { - return; - } - - if let Some(ref mut current_batch) = self.current_batch { - // Append to current batch. - current_batch.batch.append(&mut batch); - } else { - // Start new batch. - self.current_batch = Some(PendingBatch { - start: Instant::now(), - batch, - }); - } - } - - /// Check if we need to send the current batch for processing. - /// - /// The batch is then sent for processing if either: - /// * Number of calls it contains reaches `max_batch_size`. - /// * More than `max_batch_timeout` time elapsed since batch was created. - fn check_and_send_contract_batch(&mut self, command_sender: &Sender) { - let should_process = if let Some(ref current_batch) = self.current_batch { - current_batch.batch.len() >= self.max_batch_size - || current_batch.start.elapsed() >= self.max_batch_timeout - } else { - false - }; - - if should_process { - // Unwrap is safe as if the batch was none, we should not enter this block. - let current_batch = self.current_batch.take().unwrap(); - command_sender - .send(Command::ContractCallBatch(current_batch.batch)) - .unwrap(); + match self.contract.contract_take_batch() { + Ok(batch) => { + // We got a batch of calls, send it to consensus frontend for batching. + if !batch.is_empty() { + consensus_frontend.append_batch(batch); + } + } + Err(error) => { + error!( + "Failed to take contract batch from contract: {}", + error.message + ); + } } } - /// Remove any subscribers where the receiver part has been dropped. - fn clean_subscribers(&mut self) { - self.subscriptions_call.retain(|_call_id, senders| { - // Only retain non-canceled senders. - senders.retain(|sender| !sender.is_canceled()); - // Only retain call ids for which there are subscriptions. - !senders.is_empty() - }); - } - /// Process requests from a receiver until the channel closes. - fn work(&mut self, command_sender: Sender, command_receiver: Receiver) { - // Ping processing thread every max_batch_timeout. - let command_sender_clone = command_sender.clone(); - let max_batch_timeout = self.max_batch_timeout; - std::thread::spawn(move || { - while command_sender_clone.send(Command::Ping).is_ok() { - std::thread::sleep(max_batch_timeout); - } - }); - + fn work(&mut self, command_receiver: Receiver) { // Block for the next call. while let Ok(command) = command_receiver.recv() { match command { - Command::RpcCall(request, sender) => { + Command::RpcCall(request, sender, consensus_frontend) => { // Process (stateless) RPC call. let result = self.handle_rpc_call(request); sender.send(result).unwrap(); // Check if RPC call produced a batch of requests. - self.queue_contract_batch(); + self.check_and_append_contract_batch(consensus_frontend); } - Command::ContractCallBatch(batch) => { + Command::ContractCallBatch(calls, block, sender) => { // Process batch of contract calls. - self.handle_contract_batch(batch); - } - Command::SubscribeCall(call_id, sender) => { - self.subscribe_contract_batch(call_id, sender); + self.handle_contract_batch(calls, block, sender); } - Command::Ping => {} - } - - self.check_and_send_contract_batch(&command_sender); - self.clean_subscribers(); - } - } - - /// Subscribe to a specific call being processed in a batch. - fn subscribe_contract_batch(&mut self, call_id: H256, sender: BytesSender) { - // First check if there are any hits under missed calls. In this case emit - // the result immediately. - if let Some(result) = self.missed_calls.get_mut(&call_id) { - sender.send(result.clone()).unwrap(); - return; - } - - match self.subscriptions_call.entry(call_id) { - Entry::Occupied(mut entry) => { - entry.get_mut().push(sender); - } - Entry::Vacant(entry) => { - entry.insert(vec![sender]); } } } @@ -459,13 +376,11 @@ pub struct WorkerConfiguration { /// Contract binary filename. pub contract_filename: String, /// Consensus host. + // TODO: Remove once we start using the new storage backend. pub consensus_host: String, /// Consensus port. + // TODO: Remove once we start using the new storage backend. pub consensus_port: u16, - /// Max batch size. - pub max_batch_size: usize, - /// Max batch timeout. - pub max_batch_timeout: u64, /// Optional saved identity path. pub saved_identity_path: Option, /// Key manager configuration. @@ -476,6 +391,9 @@ pub struct WorkerConfiguration { pub struct Worker { /// Channel for submitting commands to the worker. command_sender: Mutex>, + /// Thread-local clone of the command sender which is required to avoid locking the + /// mutex each time we need to send a command. + tl_command_sender: ThreadLocal>, } impl Worker { @@ -500,23 +418,56 @@ impl Worker { } } - // Worker command channel. - let (command_sender, command_receiver) = channel(); - let command_sender_clone = command_sender.clone(); - // Spawn inner worker in a separate thread. + let (command_sender, command_receiver) = channel(); thread::spawn(move || { - WorkerInner::new(config, ias).work(command_sender_clone, command_receiver); + WorkerInner::new(config, ias).work(command_receiver); }); Self { command_sender: Mutex::new(command_sender), + tl_command_sender: ThreadLocal::new(), } } /// Get new clone of command sender for communicating with the worker. - pub fn get_command_sender(&self) -> Sender { - let command_sender = self.command_sender.lock().unwrap(); - command_sender.clone() + fn get_command_sender(&self) -> &Sender { + self.tl_command_sender.get_or(|| { + let command_sender = self.command_sender.lock().unwrap(); + Box::new(command_sender.clone()) + }) + } + + /// Queue an RPC call with the worker. + /// + /// Returns a receiver that will be used to deliver the response. + pub fn rpc_call( + &self, + request: Vec, + consensus_frontend: Arc, + ) -> oneshot::Receiver { + let (response_sender, response_receiver) = oneshot::channel(); + self.get_command_sender() + .send(Command::RpcCall( + request, + response_sender, + consensus_frontend, + )) + .unwrap(); + + response_receiver + } + + pub fn contract_call_batch( + &self, + calls: CallBatch, + block: Block, + ) -> oneshot::Receiver> { + let (response_sender, response_receiver) = oneshot::channel(); + self.get_command_sender() + .send(Command::ContractCallBatch(calls, block, response_sender)) + .unwrap(); + + response_receiver } } diff --git a/consensus/base/src/backend.rs b/consensus/base/src/backend.rs index c7e67e10ee8..3847e99b089 100644 --- a/consensus/base/src/backend.rs +++ b/consensus/base/src/backend.rs @@ -1,7 +1,7 @@ //! Consensus backend interface. use ekiden_common::bytes::B64; use ekiden_common::error::Error; -use ekiden_common::futures::{BoxFuture, BoxStream, Future, Stream}; +use ekiden_common::futures::{BoxFuture, BoxStream, Executor, Future, Stream}; use ekiden_common::signature::Signed; use super::{Block, Commitment, Header, Reveal}; @@ -22,6 +22,12 @@ pub enum Event { /// Consensus backend implementing the Ekiden consensus interface. pub trait ConsensusBackend { + /// Start consensus backend. + fn start(&self, executor: &mut Executor); + + /// Ask the backend tasks to terminate. + fn shutdown(&self); + /// Return the latest consensus block. /// /// The metadata contained in this block can be further used to get the latest diff --git a/consensus/dummy/Cargo.toml b/consensus/dummy/Cargo.toml index be87bc31e4a..bbb5dfaf41b 100644 --- a/consensus/dummy/Cargo.toml +++ b/consensus/dummy/Cargo.toml @@ -9,3 +9,4 @@ repository = "https://github.com/oasislabs/ekiden" [dependencies] ekiden-common = { path = "../../common", version = "0.1.0-alpha.4" } ekiden-consensus-base = { path = "../base", version = "0.1.0-alpha.4" } +log = "0.4" diff --git a/consensus/dummy/src/backend.rs b/consensus/dummy/src/backend.rs index 0b726f3a711..79ff590686e 100644 --- a/consensus/dummy/src/backend.rs +++ b/consensus/dummy/src/backend.rs @@ -4,8 +4,8 @@ use std::sync::{Arc, Mutex}; use ekiden_common::bytes::{B256, H256}; use ekiden_common::error::{Error, Result}; -use ekiden_common::futures::{future, BoxFuture, BoxStream, Stream}; -use ekiden_common::futures::sync::mpsc; +use ekiden_common::futures::{future, BoxFuture, BoxStream, Executor, Future, Stream}; +use ekiden_common::futures::sync::{mpsc, oneshot}; use ekiden_common::signature::Signed; use ekiden_common::uint::U256; use ekiden_consensus_base::*; @@ -162,16 +162,19 @@ impl Round { fn try_finalize(&mut self) -> Result { // Check if all nodes sent commitments. if self.commitments.len() != self.computation_group.len() { + info!("Still waiting for other round participants to commit"); return Ok(FinalizationResult::StillWaiting); } if self.state == State::WaitingCommitments { + info!("Commitments received, now waiting for reveals"); self.state = State::WaitingRevealsAndBlock; return Ok(FinalizationResult::NotifyReveals); } // Check if all nodes sent reveals. if self.reveals.len() != self.computation_group.len() { + info!("Still waiting for other round participants to reveal"); return Ok(FinalizationResult::StillWaiting); } @@ -182,6 +185,7 @@ impl Round { }; // Everything is ready, try to finalize round. + info!("Attempting to finalize round"); for node_id in self.computation_group.keys() { let reveal = self.reveals.get(node_id).unwrap(); let commitment = self.commitments.get(node_id).unwrap(); @@ -213,43 +217,64 @@ impl Round { // TODO: Check if storage backend contains correct state root. + info!("Round has been finalized"); Ok(FinalizationResult::Finalized(block)) } } +#[derive(Debug)] +enum Command { + Commit(Commitment, oneshot::Sender>), + Reveal(Reveal
, oneshot::Sender>), + Submit(Signed, oneshot::Sender>), +} + struct DummyConsensusBackendInner { /// In-memory blockchain. - blocks: Vec, + blocks: Mutex>, /// Current round. - round: Round, + round: Mutex, /// Block subscribers. - block_subscribers: Vec>, + block_subscribers: Mutex>>, /// Event subscribers. - event_subscribers: Vec>, + event_subscribers: Mutex>>, + /// Shutdown signal sender (until used). + shutdown_sender: Mutex>>, + /// Shutdown signal receiver (until initialized). + shutdown_receiver: Mutex>>, + /// Command sender. + command_sender: mpsc::UnboundedSender, + /// Command receiver (until initialized). + command_receiver: Mutex>>, } impl DummyConsensusBackendInner { /// Notify subscribers of a new block. - fn notify_block(&mut self, block: &Block) { - self.block_subscribers - .retain(|ref s| s.unbounded_send(block.clone()).is_ok()); + fn notify_block(&self, block: &Block) { + let mut block_subscribers = self.block_subscribers.lock().unwrap(); + block_subscribers.retain(|ref s| s.unbounded_send(block.clone()).is_ok()); } /// Notify subscribers of a new event. - fn notify_event(&mut self, event: &Event) { - self.event_subscribers - .retain(|ref s| s.unbounded_send(event.clone()).is_ok()); + fn notify_event(&self, event: &Event) { + let mut event_subscribers = self.event_subscribers.lock().unwrap(); + event_subscribers.retain(|ref s| s.unbounded_send(event.clone()).is_ok()); } /// Attempt to finalize the current round. - fn try_finalize(&mut self) { - let result = self.round.try_finalize(); + fn try_finalize(&self) { + let mut round = self.round.lock().unwrap(); + let result = round.try_finalize(); match result { Ok(FinalizationResult::Finalized(block)) => { // Round has been finalized, block is ready. - self.blocks.push(block.clone()); - self.round.reset(Some(block.clone())); + { + let mut blocks = self.blocks.lock().unwrap(); + blocks.push(block.clone()); + } + + round.reset(Some(block.clone())); self.notify_block(&block); } Ok(FinalizationResult::StillWaiting) => { @@ -261,7 +286,9 @@ impl DummyConsensusBackendInner { } Err(error) => { // Round has failed. - self.round.reset(None); + error!("Round has failed: {:?}", error); + + round.reset(None); self.notify_event(&Event::RoundFailed(error)); } } @@ -273,20 +300,33 @@ impl DummyConsensusBackendInner { /// **This backend should only be used to test implementations that use the consensus /// interface but it only simulates a consensus backend.*** pub struct DummyConsensusBackend { - inner: Arc>, + inner: Arc, } impl DummyConsensusBackend { + /// Create new dummy consensus backend. pub fn new(computation_group: Vec) -> Self { + info!( + "Creating dummy consensus backend with {} member(s) in computation group", + computation_group.len() + ); let genesis_block = Self::get_genesis_block(computation_group); + // Create channels. + let (command_sender, command_receiver) = mpsc::unbounded(); + let (shutdown_sender, shutdown_receiver) = oneshot::channel(); + Self { - inner: Arc::new(Mutex::new(DummyConsensusBackendInner { - blocks: vec![genesis_block.clone()], - round: Round::new(genesis_block), - block_subscribers: vec![], - event_subscribers: vec![], - })), + inner: Arc::new(DummyConsensusBackendInner { + blocks: Mutex::new(vec![genesis_block.clone()]), + round: Mutex::new(Round::new(genesis_block)), + block_subscribers: Mutex::new(vec![]), + event_subscribers: Mutex::new(vec![]), + shutdown_sender: Mutex::new(Some(shutdown_sender)), + shutdown_receiver: Mutex::new(Some(shutdown_receiver)), + command_sender, + command_receiver: Mutex::new(Some(command_receiver)), + }), } } @@ -310,67 +350,122 @@ impl DummyConsensusBackend { block.update(); block } + + /// Send a command to the backend task. + fn send_command( + &self, + command: Command, + receiver: oneshot::Receiver>, + ) -> BoxFuture<()> { + if let Err(_) = self.inner.command_sender.unbounded_send(command) { + return Box::new(future::err(Error::new("command channel closed"))); + } + + Box::new(receiver.then(|result| match result { + Ok(result) => result, + Err(_) => Err(Error::new("response channel closed")), + })) + } } impl ConsensusBackend for DummyConsensusBackend { - fn get_blocks(&self) -> BoxStream { - let mut inner = self.inner.lock().unwrap(); + fn start(&self, executor: &mut Executor) { + info!("Starting dummy consensus backend"); + + // Create command processing channel. + let command_receiver = self.inner + .command_receiver + .lock() + .unwrap() + .take() + .expect("start already called"); + let command_processor: BoxFuture<()> = { + let shared_inner = self.inner.clone(); + + Box::new( + command_receiver + .map_err(|_| Error::new("command channel closed")) + .for_each(move |command| -> BoxFuture<()> { + let (sender, result) = { + let mut round = shared_inner.round.lock().unwrap(); + + match command { + Command::Commit(commitment, sender) => { + (sender, round.add_commitment(commitment)) + } + Command::Reveal(reveal, sender) => { + (sender, round.add_reveal(reveal)) + } + Command::Submit(block, sender) => (sender, round.add_submit(block)), + } + }; + + shared_inner.try_finalize(); + drop(sender.send(result)); + + Box::new(future::ok(())) + }), + ) + }; + + // Create shutdown signal handler. + let shutdown_receiver = self.inner + .shutdown_receiver + .lock() + .unwrap() + .take() + .expect("start already called"); + let shutdown = Box::new(shutdown_receiver.then(|_| Err(Error::new("shutdown")))); + + executor.spawn(Box::new( + future::join_all(vec![command_processor, shutdown]).then(|_| future::ok(())), + )); + } + + fn shutdown(&self) { + info!("Shutting down dummy consensus backend"); + + if let Some(shutdown_sender) = self.inner.shutdown_sender.lock().unwrap().take() { + drop(shutdown_sender.send(())); + } + } + fn get_blocks(&self) -> BoxStream { let (sender, receiver) = mpsc::unbounded(); - match inner.blocks.last() { - Some(block) => sender.unbounded_send(block.clone()).unwrap(), - None => {} + { + let blocks = self.inner.blocks.lock().unwrap(); + match blocks.last() { + Some(block) => drop(sender.unbounded_send(block.clone())), + None => {} + } } - inner.block_subscribers.push(sender); + + let mut block_subscribers = self.inner.block_subscribers.lock().unwrap(); + block_subscribers.push(sender); Box::new(receiver.map_err(|_| Error::new("channel closed"))) } fn get_events(&self) -> BoxStream { - let mut inner = self.inner.lock().unwrap(); - let (sender, receiver) = mpsc::unbounded(); - inner.event_subscribers.push(sender); + let mut event_subscribers = self.inner.event_subscribers.lock().unwrap(); + event_subscribers.push(sender); Box::new(receiver.map_err(|_| Error::new("channel closed"))) } fn commit(&self, commitment: Commitment) -> BoxFuture<()> { - let inner = self.inner.clone(); - - Box::new(future::lazy(move || { - let mut inner = inner.lock().unwrap(); - - inner.round.add_commitment(commitment)?; - inner.try_finalize(); - - Ok(()) - })) + let (sender, receiver) = oneshot::channel(); + self.send_command(Command::Commit(commitment, sender), receiver) } fn reveal(&self, reveal: Reveal
) -> BoxFuture<()> { - let inner = self.inner.clone(); - - Box::new(future::lazy(move || { - let mut inner = inner.lock().unwrap(); - - inner.round.add_reveal(reveal)?; - inner.try_finalize(); - - Ok(()) - })) + let (sender, receiver) = oneshot::channel(); + self.send_command(Command::Reveal(reveal, sender), receiver) } fn submit(&self, block: Signed) -> BoxFuture<()> { - let inner = self.inner.clone(); - - Box::new(future::lazy(move || { - let mut inner = inner.lock().unwrap(); - - inner.round.add_submit(block)?; - inner.try_finalize(); - - Ok(()) - })) + let (sender, receiver) = oneshot::channel(); + self.send_command(Command::Submit(block, sender), receiver) } } diff --git a/consensus/dummy/src/lib.rs b/consensus/dummy/src/lib.rs index 23b5277ced3..f46a66cb4a0 100644 --- a/consensus/dummy/src/lib.rs +++ b/consensus/dummy/src/lib.rs @@ -1,4 +1,7 @@ //! Ekiden dummy consensus backend. +#[macro_use] +extern crate log; + extern crate ekiden_common; extern crate ekiden_consensus_base; diff --git a/consensus/dummy/tests/backend.rs b/consensus/dummy/tests/backend.rs index bd3ca684981..dfbff6c12ba 100644 --- a/consensus/dummy/tests/backend.rs +++ b/consensus/dummy/tests/backend.rs @@ -4,7 +4,7 @@ extern crate ekiden_consensus_dummy; use std::sync::Arc; -use ekiden_common::futures::{future, BoxFuture, Future, Stream}; +use ekiden_common::futures::{cpupool, future, Future, Stream}; use ekiden_consensus_base::ConsensusBackend; use ekiden_consensus_base::test::create_computation_group; use ekiden_consensus_dummy::DummyConsensusBackend; @@ -18,11 +18,17 @@ fn test_dummy_backend_two_rounds() { computation_group.iter().map(|n| n.get_public()).collect(), )); + let mut pool = cpupool::CpuPool::new(4); + + // Start backend. + backend.start(&mut pool); + // Start all nodes. - let mut node_tasks: Vec> = computation_group + let mut tasks = vec![]; + tasks.append(&mut computation_group .iter() .map(|n| n.start(backend.clone())) - .collect(); + .collect()); // Send compute requests to all nodes. for ref node in computation_group.iter() { @@ -46,6 +52,9 @@ fn test_dummy_backend_two_rounds() { for ref node in computation_group.iter() { node.shutdown(); } + + let backend = backend.clone(); + backend.shutdown(); } round => panic!("incorrect round number: {}", round), } @@ -53,8 +62,8 @@ fn test_dummy_backend_two_rounds() { Ok(()) }); - node_tasks.push(Box::new(wait_rounds)); + tasks.push(Box::new(wait_rounds)); // Wait for all tasks to finish. - future::join_all(node_tasks).wait().unwrap(); + pool.spawn(future::join_all(tasks)).wait().unwrap(); }