Skip to content
This repository has been archived by the owner on Oct 17, 2022. It is now read-only.

Batch execution with single execution adapter #818

Merged
merged 6 commits into from
Aug 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
300 changes: 204 additions & 96 deletions executor/src/core.rs

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion executor/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ impl From<Box<bincode::ErrorKind>> for SubscriberError {
}
}

/// Trait do separate execution errors in two categories: (i) errors caused by a bad client, (ii)
/// Trait to separate execution errors in two categories: (i) errors caused by a bad client, (ii)
/// errors caused by a fault in the authority.
pub trait ExecutionStateError: std::error::Error {
/// Whether the error is due to a fault in the authority (eg. internal storage error).
Expand Down
69 changes: 47 additions & 22 deletions executor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@ use prometheus::Registry;
use serde::de::DeserializeOwned;
use std::{fmt::Debug, sync::Arc};
use store::Store;
use tokio::{
sync::{mpsc::Sender, watch},
task::JoinHandle,
};
use tokio::{sync::watch, task::JoinHandle};
use tracing::info;
use types::{metered_channel, Batch, BatchDigest, ReconfigureNotification};
use types::{metered_channel, Batch, BatchDigest, ReconfigureNotification, SequenceNumber};

// Re-export SingleExecutor as a convenience adapter.
pub use crate::core::SingleExecutor;

/// Default inter-task channel size.
pub const DEFAULT_CHANNEL_SIZE: usize = 1_000;
Expand All @@ -44,12 +44,47 @@ pub type SerializedTransactionDigest = u64;

#[async_trait]
pub trait ExecutionState {
/// The type of the transaction to process.
type Transaction: DeserializeOwned + Send + Debug;

/// The error type to return in case something went wrong during execution.
type Error: ExecutionStateError;

/// Simple guardrail ensuring there is a single instance using the state
/// to call `handle_consensus_transaction`. Many instances may read the state,
/// or use it for other purposes.
fn ask_consensus_write_lock(&self) -> bool;

/// Tell the state that the caller instance is no longer using calling
//// `handle_consensus_transaction`.
fn release_consensus_write_lock(&self);
}

/// Execution state that gets whole certificates and the corresponding batches
/// for execution. It is responsible for deduplication in case the same certificate
/// is re-delivered after a crash.
#[async_trait]
pub trait BatchExecutionState: ExecutionState {
/// Load the last consensus index from storage.
///
/// It should return the index from which it expects a replay, so one higher than
/// the last certificate index it successfully committed. This is so it has the
/// same semantics as `ExecutionIndices`.
async fn load_next_certificate_index(&self) -> Result<SequenceNumber, Self::Error>;

/// Execute the transactions and atomically persist the consensus index.
///
/// TODO: This function should be allowed to return a new committee to reconfigure the system.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reconfiguration can simply be done by sending the following message to the Primary and the Workers through the network: ReconfigureNotification::NewCommittee(new_committee).

Below is an example:

let message = PrimaryWorkerMessage::Reconfigure(ReconfigureNotification::Shutdown);
let worker_cancel_handles = worker_network
.unreliable_broadcast(addresses, &message)
.await;

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the link! True, we discussed that we can send reconfiguration messages through gRPC.

I included this TODO because the current docs of the handle_consensus_transaction say that it can return a new committee. While that's not true, as it clearly cannot, I thought the idea that it should do so was a good one. It capture committee transitions on the level of the ExecutionState, if it is to happen as part of transaction execution. The gRPC way seems like a technical detail that the implementors of the ExecutionState should not have to concern themselves with.

But it's up to you, I just wanted it to be consistent with the other docs.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool, let's leave the TODO then (we haven't fully deployed Narwhal reconfiguration in DevNet yet, so this part may still change)

async fn handle_consensus(
&self,
consensus_output: &ConsensusOutput,
transaction_batches: Vec<Vec<SerializedTransaction>>,
) -> Result<(), Self::Error>;
}

/// Execution state that executes a single transaction at a time.
#[async_trait]
pub trait SingleExecutionState: ExecutionState {
/// The type of the transaction to process.
type Transaction: DeserializeOwned + Send + Debug;

/// The execution outcome to output.
type Outcome;

Expand All @@ -63,22 +98,15 @@ pub trait ExecutionState {
transaction: Self::Transaction,
) -> Result<Self::Outcome, Self::Error>;

/// Simple guardrail ensuring there is a single instance using the state
/// to call `handle_consensus_transaction`. Many instances may read the state,
/// or use it for other purposes.
fn ask_consensus_write_lock(&self) -> bool;

/// Tell the state that the caller instance is no longer using calling
//// `handle_consensus_transaction`.
fn release_consensus_write_lock(&self);

/// Load the last consensus index from storage.
///
/// It *must* return index that was last passed to `handle_consensus_transaction`.
async fn load_execution_indices(&self) -> Result<ExecutionIndices, Self::Error>;
}

/// The output of the executor.
pub type ExecutorOutput<State> = (
SubscriberResult<<State as ExecutionState>::Outcome>,
SubscriberResult<<State as SingleExecutionState>::Outcome>,
SerializedTransaction,
);

Expand All @@ -92,13 +120,11 @@ impl Executor {
execution_state: Arc<State>,
tx_reconfigure: &watch::Sender<ReconfigureNotification>,
rx_consensus: metered_channel::Receiver<ConsensusOutput>,
tx_output: Sender<ExecutorOutput<State>>,
tx_get_block_commands: metered_channel::Sender<BlockCommand>,
registry: &Registry,
) -> SubscriberResult<Vec<JoinHandle<()>>>
where
State: ExecutionState + Send + Sync + 'static,
State::Outcome: Send + 'static,
State: BatchExecutionState + Send + Sync + 'static,
State::Error: Debug,
{
let metrics = ExecutorMetrics::new(registry);
Expand Down Expand Up @@ -127,7 +153,6 @@ impl Executor {
execution_state,
tx_reconfigure.subscribe(),
/* rx_subscriber */ rx_executor,
tx_output,
);

// Return the handle.
Expand Down
42 changes: 23 additions & 19 deletions executor/src/tests/execution_state.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// Copyright (c) 2022, Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0
use crate::{ExecutionIndices, ExecutionState, ExecutionStateError};
use crate::{ExecutionIndices, ExecutionState, ExecutionStateError, SingleExecutionState};
use async_trait::async_trait;
use consensus::ConsensusOutput;

Expand All @@ -14,14 +14,14 @@ use store::{
use thiserror::Error;

/// A malformed transaction.
pub const MALFORMED_TRANSACTION: <TestState as ExecutionState>::Transaction = 400;
pub const MALFORMED_TRANSACTION: <TestState as SingleExecutionState>::Transaction = 400;

/// A special transaction that makes the executor engine crash.
pub const KILLER_TRANSACTION: <TestState as ExecutionState>::Transaction = 500;
pub const KILLER_TRANSACTION: <TestState as SingleExecutionState>::Transaction = 500;

/// A dumb execution state for testing.
pub struct TestState {
store: Store<u64, ExecutionIndices>,
indices_store: Store<u64, ExecutionIndices>,
}

impl std::fmt::Debug for TestState {
Expand All @@ -38,8 +38,18 @@ impl Default for TestState {

#[async_trait]
impl ExecutionState for TestState {
type Transaction = u64;
type Error = TestStateError;

fn ask_consensus_write_lock(&self) -> bool {
true
}

fn release_consensus_write_lock(&self) {}
}

#[async_trait]
impl SingleExecutionState for TestState {
type Transaction = u64;
type Outcome = Vec<u8>;

async fn handle_consensus_transaction(
Expand All @@ -53,22 +63,16 @@ impl ExecutionState for TestState {
} else if transaction == KILLER_TRANSACTION {
Err(Self::Error::ServerError)
} else {
self.store
self.indices_store
.write(Self::INDICES_ADDRESS, execution_indices)
.await;
Ok(Vec::default())
}
}

fn ask_consensus_write_lock(&self) -> bool {
true
}

fn release_consensus_write_lock(&self) {}

async fn load_execution_indices(&self) -> Result<ExecutionIndices, Self::Error> {
let indices = self
.store
.indices_store
.read(Self::INDICES_ADDRESS)
.await
.unwrap()
Expand All @@ -83,21 +87,21 @@ impl TestState {

/// Create a new test state.
pub fn new(store_path: &Path) -> Self {
const STATE_CF: &str = "test_state";
let rocksdb = open_cf(store_path, None, &[STATE_CF]).unwrap();
let map = reopen!(&rocksdb, STATE_CF;<u64, ExecutionIndices>);
const INDICES_CF: &str = "test_state_indices";
let rocksdb = open_cf(store_path, None, &[INDICES_CF]).unwrap();
let indices_map = reopen!(&rocksdb, INDICES_CF;<u64, ExecutionIndices>);
Self {
store: Store::new(map),
indices_store: Store::new(indices_map),
}
}

/// Load the execution indices; ie. the state.
/// Load the execution indices.
pub async fn get_execution_indices(&self) -> ExecutionIndices {
self.load_execution_indices().await.unwrap()
}
}

#[derive(Debug, Error)]
#[derive(Debug, Error, Clone)]
pub enum TestStateError {
#[error("Something went wrong in the authority")]
ServerError,
Expand Down
40 changes: 17 additions & 23 deletions executor/src/tests/executor_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,11 @@ async fn execute_transactions() {
// Spawn the executor.
let store = test_store();
let execution_state = Arc::new(TestState::default());
let _core_handle = Core::<TestState>::spawn(
let _core_handle = Core::spawn(
store.clone(),
execution_state.clone(),
SingleExecutor::new(execution_state.clone(), tx_output),
rx_reconfigure,
/* rx_subscriber */ rx_executor,
tx_output,
);

// Feed certificates to the mock sequencer and add the transaction data to storage (as if
Expand Down Expand Up @@ -70,20 +69,19 @@ async fn execute_empty_certificate() {
// Spawn the executor.
let store = test_store();
let execution_state = Arc::new(TestState::default());
let _core_handle = Core::<TestState>::spawn(
let _core_handle = Core::spawn(
store.clone(),
execution_state.clone(),
SingleExecutor::new(execution_state.clone(), tx_output),
rx_reconfigure,
/* rx_subscriber */ rx_executor,
tx_output,
);

// Feed empty certificates to the executor.
let empty_certificates = 2;
for _ in 0..empty_certificates {
for i in 0..empty_certificates {
let message = ConsensusOutput {
certificate: Certificate::default(),
consensus_index: SequenceNumber::default(),
consensus_index: i,
};
tx_executor.send(message).await.unwrap();
}
Expand Down Expand Up @@ -126,12 +124,11 @@ async fn execute_malformed_transactions() {
// Spawn the executor.
let store = test_store();
let execution_state = Arc::new(TestState::default());
let _core_handle = Core::<TestState>::spawn(
let _core_handle = Core::spawn(
store.clone(),
execution_state.clone(),
SingleExecutor::new(execution_state.clone(), tx_output),
rx_reconfigure,
/* rx_subscriber */ rx_executor,
tx_output,
);

// Feed a malformed transaction to the mock sequencer
Expand Down Expand Up @@ -188,19 +185,18 @@ async fn internal_error_execution() {
// Spawn the executor.
let store = test_store();
let execution_state = Arc::new(TestState::default());
let _core_hanlde = Core::<TestState>::spawn(
let _core_hanlde = Core::spawn(
store.clone(),
execution_state.clone(),
SingleExecutor::new(execution_state.clone(), tx_output),
rx_reconfigure,
/* rx_subscriber */ rx_executor,
tx_output,
);

// Feed a 'killer' transaction to the executor. This is a special test transaction that
// crashes the test executor engine.
let tx00 = 10;
let tx01 = 11;
let tx10 = 12;
let tx00 = 10u64;
let tx01 = 11u64;
let tx10 = 12u64;
let tx11 = KILLER_TRANSACTION;

let (digest_0, batch_0) = test_batch(vec![tx00, tx01]);
Expand Down Expand Up @@ -240,12 +236,11 @@ async fn crash_recovery() {
// Spawn the executor.
let store = test_store();
let execution_state = Arc::new(TestState::default());
let _core_handle = Core::<TestState>::spawn(
let _core_handle = Core::spawn(
store.clone(),
execution_state.clone(),
SingleExecutor::new(execution_state.clone(), tx_output),
rx_reconfigure,
/* rx_subscriber */ rx_executor,
tx_output,
);

// Feed two certificates with good transactions to the executor.
Expand Down Expand Up @@ -295,12 +290,11 @@ async fn crash_recovery() {
let (tx_output, mut rx_output) = channel(10);
let (_tx_reconfigure, rx_reconfigure) = watch::channel(reconfigure_notification);

let _core_handle = Core::<TestState>::spawn(
let _core_handle = Core::spawn(
store.clone(),
execution_state.clone(),
SingleExecutor::new(execution_state.clone(), tx_output),
rx_reconfigure,
/* rx_subscriber */ rx_executor,
tx_output,
);

// Feed two certificates with good transactions to the executor.
Expand Down
1 change: 0 additions & 1 deletion executor/tests/consensus_integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ async fn test_internal_consensus_output() {

assert!(result.0.is_ok());

// deserialise transaction
let output_transaction = bincode::deserialize::<String>(&result.1).unwrap();

// we always remove the first transaction and check with the one
Expand Down
22 changes: 13 additions & 9 deletions node/src/execution_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,26 @@
// SPDX-License-Identifier: Apache-2.0
use async_trait::async_trait;
use consensus::ConsensusOutput;
use executor::{ExecutionIndices, ExecutionState, ExecutionStateError};
use executor::{ExecutionIndices, ExecutionState, ExecutionStateError, SingleExecutionState};
use thiserror::Error;

/// A simple/dumb execution engine.
pub struct SimpleExecutionState;

#[async_trait]
impl ExecutionState for SimpleExecutionState {
type Transaction = String;
type Error = SimpleExecutionError;

fn ask_consensus_write_lock(&self) -> bool {
true
}

fn release_consensus_write_lock(&self) {}
}

#[async_trait]
impl SingleExecutionState for SimpleExecutionState {
type Transaction = String;
type Outcome = Vec<u8>;

async fn handle_consensus_transaction(
Expand All @@ -23,12 +33,6 @@ impl ExecutionState for SimpleExecutionState {
Ok(Vec::default())
}

fn ask_consensus_write_lock(&self) -> bool {
true
}

fn release_consensus_write_lock(&self) {}

async fn load_execution_indices(&self) -> Result<ExecutionIndices, Self::Error> {
Ok(ExecutionIndices::default())
}
Expand All @@ -41,7 +45,7 @@ impl Default for SimpleExecutionState {
}

/// A simple/dumb execution error.
#[derive(Debug, Error)]
#[derive(Debug, Error, Clone)]
pub enum SimpleExecutionError {
#[error("Something went wrong in the authority")]
ServerError,
Expand Down
Loading