Skip to content
This repository has been archived by the owner on Oct 17, 2022. It is now read-only.

Batch execution with single execution adapter #818

Merged
merged 6 commits into from
Aug 19, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
269 changes: 189 additions & 80 deletions executor/src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@ use crate::{
bail,
errors::{SubscriberError, SubscriberResult},
state::ExecutionIndices,
ExecutionState, ExecutorOutput, SerializedTransaction,
BatchExecutionState, ExecutionState, ExecutorOutput, SingleExecutionState,
};
use async_trait::async_trait;
use consensus::ConsensusOutput;
use futures::lock::Mutex;
use std::{fmt::Debug, sync::Arc};
use store::Store;
use tokio::{
Expand All @@ -33,10 +35,6 @@ pub struct Core<State: ExecutionState> {
rx_reconfigure: watch::Receiver<ReconfigureNotification>,
/// Receive ordered consensus output to execute.
rx_subscriber: metered_channel::Receiver<ConsensusOutput>,
/// Outputs executed transactions.
tx_output: Sender<ExecutorOutput<State>>,
/// The indices ensuring we do not execute twice the same transaction.
execution_indices: ExecutionIndices,
}

impl<State: ExecutionState> Drop for Core<State> {
Expand All @@ -47,8 +45,7 @@ impl<State: ExecutionState> Drop for Core<State> {

impl<State> Core<State>
where
State: ExecutionState + Send + Sync + 'static,
State::Outcome: Send + 'static,
State: BatchExecutionState + Send + Sync + 'static,
State::Error: Debug,
{
/// Spawn a new executor in a dedicated tokio task.
Expand All @@ -58,20 +55,13 @@ where
execution_state: Arc<State>,
rx_reconfigure: watch::Receiver<ReconfigureNotification>,
rx_subscriber: metered_channel::Receiver<ConsensusOutput>,
tx_output: Sender<ExecutorOutput<State>>,
) -> JoinHandle<()> {
tokio::spawn(async move {
let execution_indices = execution_state
.load_execution_indices()
.await
.expect("Failed to load execution indices from store");
Self {
store,
execution_state,
rx_reconfigure,
rx_subscriber,
tx_output,
execution_indices,
}
.run()
.await
Expand All @@ -81,6 +71,14 @@ where

/// Main loop listening to new certificates and execute them.
async fn run(&mut self) -> SubscriberResult<()> {
let _next_certificate_index = self
.execution_state
.load_next_certificate_index()
.await
.expect("Failed to load execution indices from store");

// TODO: Replay certificates from the store.

loop {
tokio::select! {
// Execute all transactions associated with the consensus output message.
Expand All @@ -106,35 +104,41 @@ where

/// Execute a single certificate.
async fn execute_certificate(&mut self, message: &ConsensusOutput) -> SubscriberResult<()> {
// Skip the certificate if it contains no transactions.
if message.certificate.header.payload.is_empty() {
self.execution_indices.skip_certificate();
return Ok(());
// Collect all transactions in all the batches.
let mut batches = Vec::new();

for batch_digest in message.certificate.header.payload.keys() {
batches.push(self.collect_batch(batch_digest).await?);
aakoshh marked this conversation as resolved.
Show resolved Hide resolved
}

// Execute every batch in the certificate.
let total_batches = message.certificate.header.payload.len();
for (index, digest) in message.certificate.header.payload.keys().enumerate() {
// Skip batches that we already executed (after crash-recovery).
if self
.execution_indices
.check_next_batch_index(index as SequenceNumber)
{
self.execute_batch(message, *digest, total_batches).await?;
let result = self
.execution_state
.handle_consensus(message, batches)
.await
.map_err(SubscriberError::from);

match result {
Ok(()) => Ok(()),
Err(error @ SubscriberError::ClientExecutionError(_)) => {
// We may want to log the errors that are the user's fault (i.e., that are neither
// our fault or the fault of consensus) for debug purposes. It is safe to continue
// by ignoring those transactions since all honest subscribers will do the same.
debug!("{error}");
Ok(())
}
Err(error) => {
bail!(error)
}
}
Ok(())
}

/// Execute a single batch of transactions.
async fn execute_batch(
/// Collect all transactions in a batch
async fn collect_batch(
&mut self,
consensus_output: &ConsensusOutput,
batch_digest: BatchDigest,
total_batches: usize,
) -> SubscriberResult<()> {
batch_digest: &BatchDigest,
) -> SubscriberResult<Vec<State::Transaction>> {
// The store should now hold all transaction data referenced by the input certificate.
let transactions = match self.store.read(batch_digest).await? {
let transactions = match self.store.read(*batch_digest).await? {
Some(x) => x.0,
None => {
// If two certificates contain the exact same batch (eg. by the actions of a Byzantine
Expand All @@ -143,54 +147,175 @@ where
// the second batch since there is no point in executing twice the same transactions
// (as the second execution attempt will always fail).
debug!("Duplicate batch {batch_digest}");
self.execution_indices.skip_batch(total_batches);
return Ok(());
return Ok(Vec::new());
}
};

// The consensus simply orders bytes, so we first need to deserialize the transaction.
// If the deserialization fail it is safe to ignore the transaction since all correct
// clients will do the same. Remember that a bad authority or client may input random
// bytes to the consensus.
let transactions = transactions
.into_iter()
.map(
|serialized| match bincode::deserialize::<State::Transaction>(&serialized) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is annoying we can also do the deserialisation of transactions without the ExecutorState?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mean to push the deserialization into the ExecutorState implementation itself?

Like I said I think letting the system know about the Transaction type is a good thing, it keeps your options open for hardening in the future by rejecting malicious input at the peripheries.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I highlighted another change in the PR description, which is that tx_output will only contain the outcome for transactions that we managed to at least deserialize (previously it had raw bytes, now it's the model). It doesn't look like this would be a problem though, because you weren't able to tell the difference in the errors anyway, so you didn't now whether the raw bytes were legit without trying to deserialize again.

Let me know, though, because pushing deserialization into the state itself could restore full freedom.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moving it out would also put the decision of what serialization to use back with the application, ie. bincode would not be prescribed by this library. But you can achieve this by adding your own traits as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pushed a change which moves the deserialisation into the SingleExecutor, to restore the current behaviour of reporting the outcome of all transactions, even if they are malformed. The BatchExecuttionState now takes raw transactions, and it's up to the application to decide how much of it to process, and what deserialisation scheme to use.

Ok(x) => Ok(x),
Err(e) => Err(SubscriberError::ClientExecutionError(format!(
"Failed to deserialize transaction: {e}"
))),
},
)
.collect::<Result<Vec<_>, _>>()?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we collect a Vec rather than a Result so that we can skip faulty transactions rather than the whole batch?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought about this while debugging one of the unit tests that failed to deserialise (the compiler decided to send i32 instead of u64). I think this is a bit of a grey area. I see that you label the error as ClientExecutionError which, during execution, is treated as a non-fatal one. However, it's not clear whether treating deseralisation errors as non-fatal would be correct.

That's because it's difficult to say why we can't deserialise a transaction: Is it because some malicious validator put a malformed message into one of the batches? Or is it because the Transaction type on our side is somehow incorrect, and our machine is the only one that fails to deserialize something? Perhaps other validators have added an extra variant to an enum, but we forgot to update our node? Skipping such transactions could lead to consensus failure down the road.

What we should do depends on how much pre-validation happens on the contents before quorum is reached over their availability. Can malicious validators use Narwhal to atomically broadcast absolute rubbish content? Or is there some vetting before we vote on it to see if it at least conforms to some basic expectations about format?

If there is such validation, then we could be sure that the honest majority thinks this transaction looks legit, so if we can't deserialize it, we should stop and fix our software. If there is no such validation, then we can't decide whose fault is it, ours or the batcher's, in which case I don't know what to do.

With that in mind, do you want to just issue a warning and skip?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed it back to skipping.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On further thought, changed it to return raw bytes at this stage.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a good point. Validators can input any rubbish as transactions, there are no checks on transactions format upon voting. Before voting, validators simply verify that the payload is available (and that the header doesn't break any safety or GC rules).

You are right that if the transaction serialisation format changes and some validators are late to the party, then it may be a safety problem. Should we open an issue for that and see what others think about it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Opening an issue can't hurt, at least there would be a story about it it. Someone using this library might want to protect themselves from the clients sending invalid content. If it's encoded into a trait, anyone happy with raw bytes can trivially allow everything in.


Ok(transactions)
}
}

/// Executor that feeds transactions one by one to the execution state.
pub struct SingleExecutor<State>
where
State: SingleExecutionState,
{
/// The (global) state to perform execution.
execution_state: Arc<State>,
/// The indices ensuring we do not execute twice the same transaction.
execution_indices: Mutex<ExecutionIndices>,
asonnino marked this conversation as resolved.
Show resolved Hide resolved
/// Outputs executed transactions.
tx_output: Sender<ExecutorOutput<State>>,
}

#[async_trait]
impl<State> ExecutionState for SingleExecutor<State>
where
State: SingleExecutionState + Sync + Send + 'static,
State::Outcome: Sync + Send + 'static,
State::Error: Sync + Send + 'static,
{
type Transaction = State::Transaction;
type Error = State::Error;

fn ask_consensus_write_lock(&self) -> bool {
self.execution_state.ask_consensus_write_lock()
}

fn release_consensus_write_lock(&self) {
self.execution_state.release_consensus_write_lock()
}
}

#[async_trait]
impl<State> BatchExecutionState for SingleExecutor<State>
where
State: SingleExecutionState + Sync + Send + 'static,
State::Outcome: Sync + Send + 'static,
State::Error: Clone + Sync + Send + 'static,
State::Transaction: Clone + Send + Sync + 'static,
{
async fn load_next_certificate_index(&self) -> Result<SequenceNumber, Self::Error> {
let indices = self.execution_state.load_execution_indices().await?;
let mut execution_indices = self.execution_indices.lock().await;
*execution_indices = indices;
Ok(execution_indices.next_certificate_index)
}

async fn handle_consensus(
&self,
consensus_output: &ConsensusOutput,
transaction_batches: Vec<Vec<State::Transaction>>,
) -> Result<(), Self::Error> {
let mut execution_indices = self.execution_indices.lock().await;

if transaction_batches.is_empty() {
execution_indices.skip_certificate();
} else {
// Execute every batch in the certificate.
let total_batches = transaction_batches.len();
for (index, batch) in transaction_batches.into_iter().enumerate() {
// Skip batches that we already executed (after crash-recovery).
if execution_indices.check_next_batch_index(index as SequenceNumber) {
self.execute_batch(
&mut execution_indices,
consensus_output,
batch,
total_batches,
)
.await?;
}
}
}
Ok(())
}
}

impl<State> SingleExecutor<State>
where
State: SingleExecutionState,
State::Transaction: Clone,
State::Error: Clone,
State::Outcome: Sync + Send + 'static,
{
pub fn new(execution_state: Arc<State>, tx_output: Sender<ExecutorOutput<State>>) -> Arc<Self> {
Arc::new(Self {
execution_state,
execution_indices: Mutex::new(ExecutionIndices::default()),
tx_output,
})
}

/// Execute a single batch of transactions.
async fn execute_batch(
&self,
execution_indices: &mut ExecutionIndices,
consensus_output: &ConsensusOutput,
transactions: Vec<State::Transaction>,
total_batches: usize,
) -> Result<(), State::Error> {
if transactions.is_empty() {
execution_indices.skip_batch(total_batches);
return Ok(());
}

// Execute every transaction in the batch.
let total_transactions = transactions.len();
for (index, transaction) in transactions.into_iter().enumerate() {
// Skip transactions that we already executed (after crash-recovery).
if self
.execution_indices
.check_next_transaction_index(index as SequenceNumber)
{
if execution_indices.check_next_transaction_index(index as SequenceNumber) {
// Execute the transaction
let result = self
.execute_transaction(
execution_indices,
consensus_output,
transaction.clone(),
total_transactions,
total_batches,
)
.await;

let (bail, result) = match result {
outcome @ Ok(..) => (None, outcome),

// We may want to log the errors that are the user's fault (i.e., that are neither
// our fault or the fault of consensus) for debug purposes. It is safe to continue
// by ignoring those transactions since all honest subscribers will do the same.
Err(error @ SubscriberError::ClientExecutionError(_)) => {
debug!("{error}");
(None, Err(error))
}

// We must take special care to errors that are our fault, such as storage errors.
// We may be the only authority experiencing it, and thus cannot continue to process
// transactions until the problem is fixed.
Err(error) => (Some(error.clone()), Err(error)),
let (fatal, outcome) = match result {
Ok(outcome) => (None, Ok(outcome)),
Err(error) => match SubscriberError::from(error.clone()) {
// We may want to log the errors that are the user's fault (i.e., that are neither
// our fault or the fault of consensus) for debug purposes. It is safe to continue
// by ignoring those transactions since all honest subscribers will do the same.
e @ SubscriberError::ClientExecutionError(_) => {
debug!("{e}");
(None, Err(e))
}
// We must take special care to errors that are our fault, such as storage errors.
// We may be the only authority experiencing it, and thus cannot continue to process
// transactions until the problem is fixed.
e => (Some(error), Err(e)),
},
};

// Output the result (eg. to notify the end-user);
let output = (result, transaction);
let output = (outcome, transaction);
if self.tx_output.send(output).await.is_err() {
debug!("No users listening for transaction execution");
}

// Bail if a fatal error occurred.
if let Some(e) = bail {
if let Some(e) = fatal {
bail!(e);
}
}
Expand All @@ -200,37 +325,21 @@ where

/// Execute a single transaction.
async fn execute_transaction(
&mut self,
&self,
execution_indices: &mut ExecutionIndices,
consensus_output: &ConsensusOutput,
serialized: SerializedTransaction,
transaction: State::Transaction,
total_transactions: usize,
total_batches: usize,
) -> SubscriberResult<<State as ExecutionState>::Outcome> {
) -> Result<State::Outcome, State::Error> {
// Compute the next expected indices. Those will be persisted upon transaction execution
// and are only used for crash-recovery.
self.execution_indices
.next(total_batches, total_transactions);

// The consensus simply orders bytes, so we first need to deserialize the transaction.
// If the deserialization fail it is safe to ignore the transaction since all correct
// clients will do the same. Remember that a bad authority or client may input random
// bytes to the consensus.
let transaction: State::Transaction = match bincode::deserialize(&serialized) {
Ok(x) => x,
Err(e) => bail!(SubscriberError::ClientExecutionError(format!(
"Failed to deserialize transaction: {e}"
))),
};
execution_indices.next(total_batches, total_transactions);

// Execute the transaction. Note that the executor will need to choose whether to discard
// transactions from previous epochs by itself.
self.execution_state
.handle_consensus_transaction(
consensus_output,
self.execution_indices.clone(),
transaction,
)
.handle_consensus_transaction(consensus_output, execution_indices.clone(), transaction)
.await
.map_err(SubscriberError::from)
}
}
2 changes: 1 addition & 1 deletion executor/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ impl From<Box<bincode::ErrorKind>> for SubscriberError {
}
}

/// Trait do separate execution errors in two categories: (i) errors caused by a bad client, (ii)
/// Trait to separate execution errors in two categories: (i) errors caused by a bad client, (ii)
/// errors caused by a fault in the authority.
pub trait ExecutionStateError: std::error::Error {
/// Whether the error is due to a fault in the authority (eg. internal storage error).
Expand Down
Loading