Skip to content
This repository has been archived by the owner on Oct 17, 2022. It is now read-only.

Commit

Permalink
revert: 0305170 - Batch execution with single execution adapter (#818)
Browse files Browse the repository at this point in the history
  • Loading branch information
huitseeker committed Aug 29, 2022
1 parent 0a78e0a commit 2c58436
Show file tree
Hide file tree
Showing 12 changed files with 215 additions and 342 deletions.
300 changes: 96 additions & 204 deletions executor/src/core.rs

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion executor/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ impl From<Box<bincode::ErrorKind>> for SubscriberError {
}
}

/// Trait to separate execution errors in two categories: (i) errors caused by a bad client, (ii)
/// Trait do separate execution errors in two categories: (i) errors caused by a bad client, (ii)
/// errors caused by a fault in the authority.
pub trait ExecutionStateError: std::error::Error {
/// Whether the error is due to a fault in the authority (eg. internal storage error).
Expand Down
69 changes: 22 additions & 47 deletions executor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@ use prometheus::Registry;
use serde::de::DeserializeOwned;
use std::{fmt::Debug, sync::Arc};
use store::Store;
use tokio::{sync::watch, task::JoinHandle};
use tokio::{
sync::{mpsc::Sender, watch},
task::JoinHandle,
};
use tracing::info;
use types::{metered_channel, Batch, BatchDigest, ReconfigureNotification, SequenceNumber};

// Re-export SingleExecutor as a convenience adapter.
pub use crate::core::SingleExecutor;
use types::{metered_channel, Batch, BatchDigest, ReconfigureNotification};

/// Convenience type representing a serialized transaction.
pub type SerializedTransaction = Vec<u8>;
Expand All @@ -41,47 +41,12 @@ pub type SerializedTransactionDigest = u64;

#[async_trait]
pub trait ExecutionState {
/// The error type to return in case something went wrong during execution.
type Error: ExecutionStateError;

/// Simple guardrail ensuring there is a single instance using the state
/// to call `handle_consensus_transaction`. Many instances may read the state,
/// or use it for other purposes.
fn ask_consensus_write_lock(&self) -> bool;

/// Tell the state that the caller instance is no longer using calling
//// `handle_consensus_transaction`.
fn release_consensus_write_lock(&self);
}

/// Execution state that gets whole certificates and the corresponding batches
/// for execution. It is responsible for deduplication in case the same certificate
/// is re-delivered after a crash.
#[async_trait]
pub trait BatchExecutionState: ExecutionState {
/// Load the last consensus index from storage.
///
/// It should return the index from which it expects a replay, so one higher than
/// the last certificate index it successfully committed. This is so it has the
/// same semantics as `ExecutionIndices`.
async fn load_next_certificate_index(&self) -> Result<SequenceNumber, Self::Error>;

/// Execute the transactions and atomically persist the consensus index.
///
/// TODO: This function should be allowed to return a new committee to reconfigure the system.
async fn handle_consensus(
&self,
consensus_output: &ConsensusOutput,
transaction_batches: Vec<Vec<SerializedTransaction>>,
) -> Result<(), Self::Error>;
}

/// Execution state that executes a single transaction at a time.
#[async_trait]
pub trait SingleExecutionState: ExecutionState {
/// The type of the transaction to process.
type Transaction: DeserializeOwned + Send + Debug;

/// The error type to return in case something went wrong during execution.
type Error: ExecutionStateError;

/// The execution outcome to output.
type Outcome;

Expand All @@ -95,15 +60,22 @@ pub trait SingleExecutionState: ExecutionState {
transaction: Self::Transaction,
) -> Result<Self::Outcome, Self::Error>;

/// Simple guardrail ensuring there is a single instance using the state
/// to call `handle_consensus_transaction`. Many instances may read the state,
/// or use it for other purposes.
fn ask_consensus_write_lock(&self) -> bool;

/// Tell the state that the caller instance is no longer using calling
//// `handle_consensus_transaction`.
fn release_consensus_write_lock(&self);

/// Load the last consensus index from storage.
///
/// It *must* return index that was last passed to `handle_consensus_transaction`.
async fn load_execution_indices(&self) -> Result<ExecutionIndices, Self::Error>;
}

/// The output of the executor.
pub type ExecutorOutput<State> = (
SubscriberResult<<State as SingleExecutionState>::Outcome>,
SubscriberResult<<State as ExecutionState>::Outcome>,
SerializedTransaction,
);

Expand All @@ -117,11 +89,13 @@ impl Executor {
execution_state: Arc<State>,
tx_reconfigure: &watch::Sender<ReconfigureNotification>,
rx_consensus: metered_channel::Receiver<ConsensusOutput>,
tx_output: Sender<ExecutorOutput<State>>,
tx_get_block_commands: metered_channel::Sender<BlockCommand>,
registry: &Registry,
) -> SubscriberResult<Vec<JoinHandle<()>>>
where
State: BatchExecutionState + Send + Sync + 'static,
State: ExecutionState + Send + Sync + 'static,
State::Outcome: Send + 'static,
State::Error: Debug,
{
let metrics = ExecutorMetrics::new(registry);
Expand Down Expand Up @@ -154,6 +128,7 @@ impl Executor {
execution_state,
tx_reconfigure.subscribe(),
/* rx_subscriber */ rx_executor,
tx_output,
);

// Return the handle.
Expand Down
42 changes: 19 additions & 23 deletions executor/src/tests/execution_state.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// Copyright (c) 2022, Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0
use crate::{ExecutionIndices, ExecutionState, ExecutionStateError, SingleExecutionState};
use crate::{ExecutionIndices, ExecutionState, ExecutionStateError};
use async_trait::async_trait;
use consensus::ConsensusOutput;

Expand All @@ -14,14 +14,14 @@ use store::{
use thiserror::Error;

/// A malformed transaction.
pub const MALFORMED_TRANSACTION: <TestState as SingleExecutionState>::Transaction = 400;
pub const MALFORMED_TRANSACTION: <TestState as ExecutionState>::Transaction = 400;

/// A special transaction that makes the executor engine crash.
pub const KILLER_TRANSACTION: <TestState as SingleExecutionState>::Transaction = 500;
pub const KILLER_TRANSACTION: <TestState as ExecutionState>::Transaction = 500;

/// A dumb execution state for testing.
pub struct TestState {
indices_store: Store<u64, ExecutionIndices>,
store: Store<u64, ExecutionIndices>,
}

impl std::fmt::Debug for TestState {
Expand All @@ -38,18 +38,8 @@ impl Default for TestState {

#[async_trait]
impl ExecutionState for TestState {
type Error = TestStateError;

fn ask_consensus_write_lock(&self) -> bool {
true
}

fn release_consensus_write_lock(&self) {}
}

#[async_trait]
impl SingleExecutionState for TestState {
type Transaction = u64;
type Error = TestStateError;
type Outcome = Vec<u8>;

async fn handle_consensus_transaction(
Expand All @@ -63,16 +53,22 @@ impl SingleExecutionState for TestState {
} else if transaction == KILLER_TRANSACTION {
Err(Self::Error::ServerError)
} else {
self.indices_store
self.store
.write(Self::INDICES_ADDRESS, execution_indices)
.await;
Ok(Vec::default())
}
}

fn ask_consensus_write_lock(&self) -> bool {
true
}

fn release_consensus_write_lock(&self) {}

async fn load_execution_indices(&self) -> Result<ExecutionIndices, Self::Error> {
let indices = self
.indices_store
.store
.read(Self::INDICES_ADDRESS)
.await
.unwrap()
Expand All @@ -87,21 +83,21 @@ impl TestState {

/// Create a new test state.
pub fn new(store_path: &Path) -> Self {
const INDICES_CF: &str = "test_state_indices";
let rocksdb = open_cf(store_path, None, &[INDICES_CF]).unwrap();
let indices_map = reopen!(&rocksdb, INDICES_CF;<u64, ExecutionIndices>);
const STATE_CF: &str = "test_state";
let rocksdb = open_cf(store_path, None, &[STATE_CF]).unwrap();
let map = reopen!(&rocksdb, STATE_CF;<u64, ExecutionIndices>);
Self {
indices_store: Store::new(indices_map),
store: Store::new(map),
}
}

/// Load the execution indices.
/// Load the execution indices; ie. the state.
pub async fn get_execution_indices(&self) -> ExecutionIndices {
self.load_execution_indices().await.unwrap()
}
}

#[derive(Debug, Error, Clone)]
#[derive(Debug, Error)]
pub enum TestStateError {
#[error("Something went wrong in the authority")]
ServerError,
Expand Down
40 changes: 23 additions & 17 deletions executor/src/tests/executor_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,12 @@ async fn execute_transactions() {
// Spawn the executor.
let store = test_store();
let execution_state = Arc::new(TestState::default());
let _core_handle = Core::spawn(
let _core_handle = Core::<TestState>::spawn(
store.clone(),
SingleExecutor::new(execution_state.clone(), tx_output),
execution_state.clone(),
rx_reconfigure,
/* rx_subscriber */ rx_executor,
tx_output,
);

// Feed certificates to the mock sequencer and add the transaction data to storage (as if
Expand Down Expand Up @@ -69,19 +70,20 @@ async fn execute_empty_certificate() {
// Spawn the executor.
let store = test_store();
let execution_state = Arc::new(TestState::default());
let _core_handle = Core::spawn(
let _core_handle = Core::<TestState>::spawn(
store.clone(),
SingleExecutor::new(execution_state.clone(), tx_output),
execution_state.clone(),
rx_reconfigure,
/* rx_subscriber */ rx_executor,
tx_output,
);

// Feed empty certificates to the executor.
let empty_certificates = 2;
for i in 0..empty_certificates {
for _ in 0..empty_certificates {
let message = ConsensusOutput {
certificate: Certificate::default(),
consensus_index: i,
consensus_index: SequenceNumber::default(),
};
tx_executor.send(message).await.unwrap();
}
Expand Down Expand Up @@ -124,11 +126,12 @@ async fn execute_malformed_transactions() {
// Spawn the executor.
let store = test_store();
let execution_state = Arc::new(TestState::default());
let _core_handle = Core::spawn(
let _core_handle = Core::<TestState>::spawn(
store.clone(),
SingleExecutor::new(execution_state.clone(), tx_output),
execution_state.clone(),
rx_reconfigure,
/* rx_subscriber */ rx_executor,
tx_output,
);

// Feed a malformed transaction to the mock sequencer
Expand Down Expand Up @@ -185,18 +188,19 @@ async fn internal_error_execution() {
// Spawn the executor.
let store = test_store();
let execution_state = Arc::new(TestState::default());
let _core_hanlde = Core::spawn(
let _core_hanlde = Core::<TestState>::spawn(
store.clone(),
SingleExecutor::new(execution_state.clone(), tx_output),
execution_state.clone(),
rx_reconfigure,
/* rx_subscriber */ rx_executor,
tx_output,
);

// Feed a 'killer' transaction to the executor. This is a special test transaction that
// crashes the test executor engine.
let tx00 = 10u64;
let tx01 = 11u64;
let tx10 = 12u64;
let tx00 = 10;
let tx01 = 11;
let tx10 = 12;
let tx11 = KILLER_TRANSACTION;

let (digest_0, batch_0) = test_batch(vec![tx00, tx01]);
Expand Down Expand Up @@ -236,11 +240,12 @@ async fn crash_recovery() {
// Spawn the executor.
let store = test_store();
let execution_state = Arc::new(TestState::default());
let _core_handle = Core::spawn(
let _core_handle = Core::<TestState>::spawn(
store.clone(),
SingleExecutor::new(execution_state.clone(), tx_output),
execution_state.clone(),
rx_reconfigure,
/* rx_subscriber */ rx_executor,
tx_output,
);

// Feed two certificates with good transactions to the executor.
Expand Down Expand Up @@ -290,11 +295,12 @@ async fn crash_recovery() {
let (tx_output, mut rx_output) = channel(10);
let (_tx_reconfigure, rx_reconfigure) = watch::channel(reconfigure_notification);

let _core_handle = Core::spawn(
let _core_handle = Core::<TestState>::spawn(
store.clone(),
SingleExecutor::new(execution_state.clone(), tx_output),
execution_state.clone(),
rx_reconfigure,
/* rx_subscriber */ rx_executor,
tx_output,
);

// Feed two certificates with good transactions to the executor.
Expand Down
1 change: 1 addition & 0 deletions executor/tests/consensus_integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ async fn test_internal_consensus_output() {

assert!(result.0.is_ok());

// deserialise transaction
let output_transaction = bincode::deserialize::<String>(&result.1).unwrap();

// we always remove the first transaction and check with the one
Expand Down
22 changes: 9 additions & 13 deletions node/src/execution_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,16 @@
// SPDX-License-Identifier: Apache-2.0
use async_trait::async_trait;
use consensus::ConsensusOutput;
use executor::{ExecutionIndices, ExecutionState, ExecutionStateError, SingleExecutionState};
use executor::{ExecutionIndices, ExecutionState, ExecutionStateError};
use thiserror::Error;

/// A simple/dumb execution engine.
pub struct SimpleExecutionState;

#[async_trait]
impl ExecutionState for SimpleExecutionState {
type Error = SimpleExecutionError;

fn ask_consensus_write_lock(&self) -> bool {
true
}

fn release_consensus_write_lock(&self) {}
}

#[async_trait]
impl SingleExecutionState for SimpleExecutionState {
type Transaction = String;
type Error = SimpleExecutionError;
type Outcome = Vec<u8>;

async fn handle_consensus_transaction(
Expand All @@ -33,6 +23,12 @@ impl SingleExecutionState for SimpleExecutionState {
Ok(Vec::default())
}

fn ask_consensus_write_lock(&self) -> bool {
true
}

fn release_consensus_write_lock(&self) {}

async fn load_execution_indices(&self) -> Result<ExecutionIndices, Self::Error> {
Ok(ExecutionIndices::default())
}
Expand All @@ -45,7 +41,7 @@ impl Default for SimpleExecutionState {
}

/// A simple/dumb execution error.
#[derive(Debug, Error, Clone)]
#[derive(Debug, Error)]
pub enum SimpleExecutionError {
#[error("Something went wrong in the authority")]
ServerError,
Expand Down
Loading

0 comments on commit 2c58436

Please sign in to comment.