From f8d79f68d605025709d43c8c71134fe56ddd869d Mon Sep 17 00:00:00 2001 From: Arun Koshy <97870774+arun-koshy@users.noreply.github.com> Date: Fri, 26 May 2023 16:47:45 -0700 Subject: [PATCH] Move protocol config to NarwhalManager start... Remove epoch from BatchV2 --- crates/sui-core/src/consensus_validator.rs | 20 ++---- crates/sui-core/src/narwhal_manager/mod.rs | 50 ++++++++------- .../src/unit_tests/narwhal_manager_tests.rs | 5 +- crates/sui-node/src/lib.rs | 16 ++--- narwhal/node/src/main.rs | 2 +- narwhal/node/src/primary_node.rs | 8 +-- narwhal/node/src/worker_node.rs | 11 +--- narwhal/node/tests/node_test.rs | 24 +++---- narwhal/node/tests/staged/narwhal.yaml | 1 - .../tests/block_synchronizer_tests.rs | 27 +++----- .../block_synchronizer/tests/handler_tests.rs | 42 +++---------- .../primary/src/tests/block_remover_tests.rs | 24 ++----- .../primary/src/tests/block_waiter_tests.rs | 20 ++---- .../src/tests/certificate_fetcher_tests.rs | 7 +-- narwhal/primary/src/tests/primary_tests.rs | 25 +------- narwhal/primary/src/tests/proposer_tests.rs | 7 +-- .../primary/src/tests/synchronizer_tests.rs | 12 +--- .../tests/integration_tests_validator_api.rs | 8 +-- narwhal/storage/src/certificate_store.rs | 3 +- narwhal/storage/src/payload_store.rs | 2 +- narwhal/storage/src/proposer_store.rs | 6 +- narwhal/test-utils/src/cluster.rs | 2 +- narwhal/test-utils/src/lib.rs | 49 ++++----------- narwhal/types/benches/batch_digest.rs | 1 - narwhal/types/src/primary.rs | 63 +++---------------- narwhal/worker/src/batch_fetcher.rs | 42 ++++++------- narwhal/worker/src/batch_maker.rs | 11 ++-- narwhal/worker/src/tests/batch_maker_tests.rs | 6 +- narwhal/worker/src/tests/handlers_tests.rs | 10 +-- .../worker/src/tests/quorum_waiter_tests.rs | 4 +- narwhal/worker/src/tests/worker_tests.rs | 6 +- narwhal/worker/src/worker.rs | 1 - 32 files changed, 149 insertions(+), 366 deletions(-) diff --git a/crates/sui-core/src/consensus_validator.rs b/crates/sui-core/src/consensus_validator.rs index 872c55aac1a471..d9e38e6423da21 100644 --- a/crates/sui-core/src/consensus_validator.rs +++ b/crates/sui-core/src/consensus_validator.rs @@ -213,11 +213,7 @@ mod tests { }) .collect(); - let batch = Batch::new( - transaction_bytes, - latest_protocol_config, - validator.epoch_store.epoch(), - ); + let batch = Batch::new(transaction_bytes, latest_protocol_config); let res_batch = validator .validate_batch(&batch, latest_protocol_config) .await; @@ -235,11 +231,7 @@ mod tests { }) .collect(); - let batch = Batch::new( - bogus_transaction_bytes, - latest_protocol_config, - validator.epoch_store.epoch(), - ); + let batch = Batch::new(bogus_transaction_bytes, latest_protocol_config); let res_batch = validator .validate_batch(&batch, latest_protocol_config) .await; @@ -248,7 +240,7 @@ mod tests { // TODO: Remove once we have upgraded to protocol version 12. // protocol version 11 should only support BatchV1 let protocol_config_v11 = &get_protocol_config(11); - let batch_v1 = Batch::new(vec![], protocol_config_v11, validator.epoch_store.epoch()); + let batch_v1 = Batch::new(vec![], protocol_config_v11); // Case #1: Receive BatchV1 and network has not upgraded to 12 so we are okay let res_batch = validator @@ -261,11 +253,7 @@ mod tests { .await; assert!(res_batch.is_err()); - let batch_v2 = Batch::new( - vec![], - latest_protocol_config, - validator.epoch_store.epoch(), - ); + let batch_v2 = Batch::new(vec![], latest_protocol_config); // Case #3: Receive BatchV2 but network is still in v11 so we fail because we expect BatchV1 let res_batch = validator .validate_batch(&batch_v2, protocol_config_v11) diff --git a/crates/sui-core/src/narwhal_manager/mod.rs b/crates/sui-core/src/narwhal_manager/mod.rs index 2286a7298e5cc1..b05990e819f354 100644 --- a/crates/sui-core/src/narwhal_manager/mod.rs +++ b/crates/sui-core/src/narwhal_manager/mod.rs @@ -18,13 +18,13 @@ use prometheus::{register_int_gauge_with_registry, IntGauge, Registry}; use std::path::PathBuf; use std::sync::Arc; use std::time::Instant; -use sui_protocol_config::ProtocolConfig; +use sui_protocol_config::{ProtocolVersion, ProtocolConfig}; use sui_types::crypto::{AuthorityKeyPair, NetworkKeyPair}; use tokio::sync::Mutex; #[derive(PartialEq)] enum Running { - True(Epoch), + True(Epoch, ProtocolVersion), False, } @@ -89,25 +89,17 @@ pub struct NarwhalManager { } impl NarwhalManager { - pub fn new( - protocol_config: ProtocolConfig, - config: NarwhalConfiguration, - metrics: NarwhalManagerMetrics, - ) -> Self { + pub fn new(config: NarwhalConfiguration, metrics: NarwhalManagerMetrics) -> Self { // Create the Narwhal Primary with configuration let primary_node = PrimaryNode::new( - protocol_config.clone(), config.parameters.clone(), true, config.registry_service.clone(), ); // Create Narwhal Workers with configuration - let worker_nodes = WorkerNodes::new( - config.registry_service.clone(), - protocol_config, - config.parameters.clone(), - ); + let worker_nodes = + WorkerNodes::new(config.registry_service.clone(), config.parameters.clone()); let store_cache_metrics = CertificateStoreCacheMetrics::new(&config.registry_service.default_registry()); @@ -126,9 +118,15 @@ impl NarwhalManager { } // Starts the Narwhal (primary & worker(s)) - if not already running. + // Note: After a binary is updated with the new protocol version and the node + // is restarted, the protocol config does not take effect until we have a quorum + // of validators have updated the binary. Because of this the protocol upgrade + // will happen in the following epoch after quorum is reached. In this case NarwhalManager + // is not recreated which is why we pass protocol config in at start and not at creation. pub async fn start( &self, committee: Committee, + protocol_config: ProtocolConfig, worker_cache: WorkerCache, execution_state: Arc, tx_validator: TxValidator, @@ -137,10 +135,10 @@ impl NarwhalManager { { let mut running = self.running.lock().await; - if let Running::True(epoch) = *running { + if let Running::True(epoch, version) = *running { tracing::warn!( - "Narwhal node is already Running at epoch {:?} - shutdown first before starting", - epoch + "Narwhal node is already Running for epoch {epoch:?} & version {version:?} - shutdown first before starting", + ); return; } @@ -156,7 +154,11 @@ impl NarwhalManager { let name = self.primary_keypair.public().clone(); - tracing::info!("Starting up Narwhal for epoch {}", committee.epoch()); + tracing::info!( + "Starting up Narwhal for epoch {} & version {:?}", + committee.epoch(), + protocol_config.version + ); // start primary const MAX_PRIMARY_RETRIES: u32 = 2; @@ -168,6 +170,7 @@ impl NarwhalManager { self.primary_keypair.copy(), self.network_keypair.copy(), committee.clone(), + protocol_config.clone(), worker_cache.clone(), network_client.clone(), &store, @@ -206,6 +209,7 @@ impl NarwhalManager { name.clone(), id_keypair_copy, committee.clone(), + protocol_config.clone(), worker_cache.clone(), network_client.clone(), &store, @@ -228,8 +232,9 @@ impl NarwhalManager { } tracing::info!( - "Starting up Narwhal for epoch {} is complete - took {} seconds", + "Starting up Narwhal for epoch {} & version {:?} is complete - took {} seconds", committee.epoch(), + protocol_config.version, now.elapsed().as_secs_f64() ); @@ -242,7 +247,7 @@ impl NarwhalManager { .set(primary_retries as i64); self.metrics.start_worker_retries.set(worker_retries as i64); - *running = Running::True(committee.epoch()); + *running = Running::True(committee.epoch(), protocol_config.version); } // Shuts down whole Narwhal (primary & worker(s)) and waits until nodes @@ -251,16 +256,15 @@ impl NarwhalManager { let mut running = self.running.lock().await; match *running { - Running::True(epoch) => { + Running::True(epoch, version) => { let now = Instant::now(); - tracing::info!("Shutting down Narwhal epoch {:?}", epoch); + tracing::info!("Shutting down Narwhal for epoch {epoch:?} & version {version:?}"); self.primary_node.shutdown().await; self.worker_nodes.shutdown().await; tracing::info!( - "Narwhal shutdown for epoch {:?} is complete - took {} seconds", - epoch, + "Narwhal shutdown for epoch {epoch:?} & version {version:?} is complete - took {} seconds", now.elapsed().as_secs_f64() ); diff --git a/crates/sui-core/src/unit_tests/narwhal_manager_tests.rs b/crates/sui-core/src/unit_tests/narwhal_manager_tests.rs index c253d14247b047..470e1a4c2aecdb 100644 --- a/crates/sui-core/src/unit_tests/narwhal_manager_tests.rs +++ b/crates/sui-core/src/unit_tests/narwhal_manager_tests.rs @@ -127,13 +127,13 @@ async fn test_narwhal_manager() { let metrics = NarwhalManagerMetrics::new(&Registry::new()); - let narwhal_manager = - NarwhalManager::new(latest_protocol_version(), narwhal_config, metrics); + let narwhal_manager = NarwhalManager::new(narwhal_config, metrics); // start narwhal narwhal_manager .start( narwhal_committee.clone(), + latest_protocol_version(), worker_cache.clone(), Arc::new(execution_state.clone()), TrivialTransactionValidator::default(), @@ -195,6 +195,7 @@ async fn test_narwhal_manager() { narwhal_manager .start( narwhal_committee.clone(), + latest_protocol_version(), worker_cache.clone(), Arc::new(execution_state.clone()), TrivialTransactionValidator::default(), diff --git a/crates/sui-node/src/lib.rs b/crates/sui-node/src/lib.rs index ee89e03b1e8443..96146862795e81 100644 --- a/crates/sui-node/src/lib.rs +++ b/crates/sui-node/src/lib.rs @@ -676,12 +676,8 @@ impl SuiNode { connection_monitor_status.clone(), ®istry_service.default_registry(), )); - let narwhal_manager = Self::construct_narwhal_manager( - epoch_store.protocol_config().clone(), - config, - consensus_config, - registry_service, - )?; + let narwhal_manager = + Self::construct_narwhal_manager(config, consensus_config, registry_service)?; let mut narwhal_epoch_data_remover = EpochDataRemover::new(narwhal_manager.get_storage_base_path()); @@ -776,6 +772,7 @@ impl SuiNode { narwhal_manager .start( new_epoch_start_state.get_narwhal_committee(), + epoch_store.protocol_config().clone(), worker_cache, consensus_handler, SuiTxValidator::new( @@ -846,7 +843,6 @@ impl SuiNode { } fn construct_narwhal_manager( - protocol_config: ProtocolConfig, config: &NodeConfig, consensus_config: &ConsensusConfig, registry_service: &RegistryService, @@ -862,11 +858,7 @@ impl SuiNode { let metrics = NarwhalManagerMetrics::new(®istry_service.default_registry()); - Ok(NarwhalManager::new( - protocol_config, - narwhal_config, - metrics, - )) + Ok(NarwhalManager::new(narwhal_config, metrics)) } fn construct_consensus_adapter( diff --git a/narwhal/node/src/main.rs b/narwhal/node/src/main.rs index 7f46c10dee3894..9c68a43961e931 100644 --- a/narwhal/node/src/main.rs +++ b/narwhal/node/src/main.rs @@ -282,7 +282,6 @@ async fn run( // Spawn the primary and consensus core. ("primary", Some(sub_matches)) => { let primary = PrimaryNode::new( - ProtocolConfig::get_for_version(ProtocolVersion::max()), parameters.clone(), !sub_matches.is_present("consensus-disabled"), registry_service, @@ -293,6 +292,7 @@ async fn run( primary_keypair, primary_network_keypair, committee, + ProtocolConfig::get_for_version(ProtocolVersion::max()), worker_cache, client.clone(), &store, diff --git a/narwhal/node/src/primary_node.rs b/narwhal/node/src/primary_node.rs index 391c840dd25f82..c05a864f17d7cd 100644 --- a/narwhal/node/src/primary_node.rs +++ b/narwhal/node/src/primary_node.rs @@ -27,7 +27,6 @@ use tracing::{debug, info, instrument}; use types::{Certificate, ConditionalBroadcastReceiver, PreSubscribedBroadcastSender, Round}; struct PrimaryNodeInner { - protocol_config: ProtocolConfig, // The configuration parameters. parameters: Parameters, // Whether to run consensus (and an executor client) or not. @@ -68,6 +67,7 @@ impl PrimaryNodeInner { network_keypair: NetworkKeyPair, // The committee information. committee: Committee, + protocol_config: ProtocolConfig, // The worker information cache. worker_cache: WorkerCache, // Client for communications. @@ -101,7 +101,7 @@ impl PrimaryNodeInner { worker_cache, client, store, - self.protocol_config.clone(), + protocol_config.clone(), self.parameters.clone(), self.internal_consensus, execution_state, @@ -406,13 +406,11 @@ pub struct PrimaryNode { impl PrimaryNode { pub fn new( - protocol_config: ProtocolConfig, parameters: Parameters, internal_consensus: bool, registry_service: RegistryService, ) -> PrimaryNode { let inner = PrimaryNodeInner { - protocol_config, parameters, internal_consensus, registry_service, @@ -435,6 +433,7 @@ impl PrimaryNode { network_keypair: NetworkKeyPair, // The committee information. committee: Committee, + protocol_config: ProtocolConfig, // The worker information cache. worker_cache: WorkerCache, // Client for communications. @@ -455,6 +454,7 @@ impl PrimaryNode { keypair, network_keypair, committee, + protocol_config, worker_cache, client, store, diff --git a/narwhal/node/src/worker_node.rs b/narwhal/node/src/worker_node.rs index d2db669ef8f640..9e673497bf9791 100644 --- a/narwhal/node/src/worker_node.rs +++ b/narwhal/node/src/worker_node.rs @@ -260,22 +260,16 @@ pub struct WorkerNodes { workers: ArcSwap>, registry_service: RegistryService, registry_id: ArcSwapOption, - protocol_config: ProtocolConfig, parameters: Parameters, client: ArcSwapOption, } impl WorkerNodes { - pub fn new( - registry_service: RegistryService, - protocol_config: ProtocolConfig, - parameters: Parameters, - ) -> Self { + pub fn new(registry_service: RegistryService, parameters: Parameters) -> Self { Self { workers: ArcSwap::from(Arc::new(HashMap::default())), registry_service, registry_id: ArcSwapOption::empty(), - protocol_config, parameters, client: ArcSwapOption::empty(), } @@ -290,6 +284,7 @@ impl WorkerNodes { ids_and_keypairs: Vec<(WorkerId, NetworkKeyPair)>, // The committee information. committee: Committee, + protocol_config: ProtocolConfig, // The worker information cache. worker_cache: WorkerCache, // Client for communications. @@ -321,7 +316,7 @@ impl WorkerNodes { for (worker_id, key_pair) in ids_and_keypairs { let worker = WorkerNode::new( worker_id, - self.protocol_config.clone(), + protocol_config.clone(), self.parameters.clone(), self.registry_service.clone(), ); diff --git a/narwhal/node/tests/node_test.rs b/narwhal/node/tests/node_test.rs index 2bf8876a19ab43..48b4446c84d6ef 100644 --- a/narwhal/node/tests/node_test.rs +++ b/narwhal/node/tests/node_test.rs @@ -43,17 +43,13 @@ async fn simple_primary_worker_node_start_stop() { let execution_state = Arc::new(SimpleExecutionState::new(tx_confirmation)); // WHEN - let primary_node = PrimaryNode::new( - latest_protocol_version(), - parameters.clone(), - true, - registry_service.clone(), - ); + let primary_node = PrimaryNode::new(parameters.clone(), true, registry_service.clone()); primary_node .start( key_pair.copy(), network_key_pair.copy(), committee.clone(), + latest_protocol_version(), worker_cache.clone(), client.clone(), &store, @@ -63,17 +59,14 @@ async fn simple_primary_worker_node_start_stop() { .unwrap(); // AND - let workers = WorkerNodes::new( - registry_service, - latest_protocol_version(), - parameters.clone(), - ); + let workers = WorkerNodes::new(registry_service, parameters.clone()); workers .start( key_pair.public().clone(), vec![(0, authority.worker(0).keypair().copy())], committee, + latest_protocol_version(), worker_cache, client, &store, @@ -134,17 +127,13 @@ async fn primary_node_restart() { let execution_state = Arc::new(SimpleExecutionState::new(tx_confirmation)); // AND - let primary_node = PrimaryNode::new( - latest_protocol_version(), - parameters.clone(), - true, - registry_service.clone(), - ); + let primary_node = PrimaryNode::new(parameters.clone(), true, registry_service.clone()); primary_node .start( key_pair.copy(), network_key_pair.copy(), committee.clone(), + latest_protocol_version(), worker_cache.clone(), client.clone(), &store, @@ -166,6 +155,7 @@ async fn primary_node_restart() { key_pair.copy(), network_key_pair.copy(), committee.clone(), + latest_protocol_version(), worker_cache.clone(), client.clone(), &store, diff --git a/narwhal/node/tests/staged/narwhal.yaml b/narwhal/node/tests/staged/narwhal.yaml index e5b7a0b3cd8f53..bf6d70a3cbb116 100644 --- a/narwhal/node/tests/staged/narwhal.yaml +++ b/narwhal/node/tests/staged/narwhal.yaml @@ -25,7 +25,6 @@ BatchV1: TYPENAME: Metadata BatchV2: STRUCT: - - epoch: U64 - transactions: SEQ: SEQ: U8 diff --git a/narwhal/primary/src/block_synchronizer/tests/block_synchronizer_tests.rs b/narwhal/primary/src/block_synchronizer/tests/block_synchronizer_tests.rs index f56476057c4dd1..0a66222822db79 100644 --- a/narwhal/primary/src/block_synchronizer/tests/block_synchronizer_tests.rs +++ b/narwhal/primary/src/block_synchronizer/tests/block_synchronizer_tests.rs @@ -54,10 +54,8 @@ async fn test_successful_headers_synchronization() { // TODO: duplicated code in this file. // AND generate headers with distributed batches between 2 workers (0 and 1) for _ in 0..8 { - let batch_1 = - fixture_batch_with_transactions(10, &latest_protocol_version(), committee.epoch()); - let batch_2 = - fixture_batch_with_transactions(10, &latest_protocol_version(), committee.epoch()); + let batch_1 = fixture_batch_with_transactions(10, &latest_protocol_version()); + let batch_2 = fixture_batch_with_transactions(10, &latest_protocol_version()); let header = Header::V1( author @@ -203,10 +201,8 @@ async fn test_successful_payload_synchronization() { // AND generate headers with distributed batches between 2 workers (0 and 1) for _ in 0..8 { - let batch_1 = - fixture_batch_with_transactions(10, &latest_protocol_version(), committee.epoch()); - let batch_2 = - fixture_batch_with_transactions(10, &latest_protocol_version(), committee.epoch()); + let batch_1 = fixture_batch_with_transactions(10, &latest_protocol_version()); + let batch_2 = fixture_batch_with_transactions(10, &latest_protocol_version()); let header = Header::V1( author @@ -388,11 +384,7 @@ async fn test_timeout_while_waiting_for_certificates() { author .header_builder(&committee) .with_payload_batch( - fixture_batch_with_transactions( - 10, - &latest_protocol_version(), - committee.epoch(), - ), + fixture_batch_with_transactions(10, &latest_protocol_version()), 0, 0, ) @@ -534,8 +526,7 @@ async fn test_reply_with_certificates_already_in_storage() { // AND storing some certificates for i in 1..=8 { - let batch = - fixture_batch_with_transactions(10, &latest_protocol_version(), committee.epoch()); + let batch = fixture_batch_with_transactions(10, &latest_protocol_version()); let header = Header::V1( author @@ -639,8 +630,7 @@ async fn test_reply_with_payload_already_in_storage() { // AND storing some certificates for i in 1..=8 { - let batch = - fixture_batch_with_transactions(10, &latest_protocol_version(), committee.epoch()); + let batch = fixture_batch_with_transactions(10, &latest_protocol_version()); let header = Header::V1( author @@ -748,8 +738,7 @@ async fn test_reply_with_payload_already_in_storage_for_own_certificates() { // AND storing some certificates for _ in 0..5 { - let batch = - fixture_batch_with_transactions(10, &latest_protocol_version(), committee.epoch()); + let batch = fixture_batch_with_transactions(10, &latest_protocol_version()); let header = Header::V1( primary diff --git a/narwhal/primary/src/block_synchronizer/tests/handler_tests.rs b/narwhal/primary/src/block_synchronizer/tests/handler_tests.rs index da46af888af0b6..0089b536534f31 100644 --- a/narwhal/primary/src/block_synchronizer/tests/handler_tests.rs +++ b/narwhal/primary/src/block_synchronizer/tests/handler_tests.rs @@ -36,11 +36,7 @@ async fn test_get_and_synchronize_block_headers_when_fetched_from_storage() { let header = Header::V1( author .header_builder(&committee) - .payload(fixture_payload( - 1, - &latest_protocol_version(), - committee.epoch(), - )) + .payload(fixture_payload(1, &latest_protocol_version())) .build() .unwrap(), ); @@ -100,11 +96,7 @@ async fn test_get_and_synchronize_block_headers_when_fetched_from_peers() { let header = Header::V1( author .header_builder(&committee) - .payload(fixture_payload( - 1, - &latest_protocol_version(), - committee.epoch(), - )) + .payload(fixture_payload(1, &latest_protocol_version())) .build() .unwrap(), ); @@ -115,11 +107,7 @@ async fn test_get_and_synchronize_block_headers_when_fetched_from_peers() { let header = Header::V1( author .header_builder(&committee) - .payload(fixture_payload( - 2, - &latest_protocol_version(), - committee.epoch(), - )) + .payload(fixture_payload(2, &latest_protocol_version())) .build() .unwrap(), ); @@ -211,11 +199,7 @@ async fn test_get_and_synchronize_block_headers_timeout_on_causal_completion() { let header = Header::V1( author .header_builder(&committee) - .payload(fixture_payload( - 1, - &latest_protocol_version(), - committee.epoch(), - )) + .payload(fixture_payload(1, &latest_protocol_version())) .build() .unwrap(), ); @@ -226,11 +210,7 @@ async fn test_get_and_synchronize_block_headers_timeout_on_causal_completion() { let header = Header::V1( author .header_builder(&committee) - .payload(fixture_payload( - 2, - &latest_protocol_version(), - committee.epoch(), - )) + .payload(fixture_payload(2, &latest_protocol_version())) .build() .unwrap(), ); @@ -304,11 +284,7 @@ async fn test_synchronize_block_payload() { let header = Header::V1( author .header_builder(&committee) - .payload(fixture_payload( - 1, - &latest_protocol_version(), - committee.epoch(), - )) + .payload(fixture_payload(1, &latest_protocol_version())) .build() .unwrap(), ); @@ -321,11 +297,7 @@ async fn test_synchronize_block_payload() { let header = Header::V1( author .header_builder(&committee) - .payload(fixture_payload( - 2, - &latest_protocol_version(), - committee.epoch(), - )) + .payload(fixture_payload(2, &latest_protocol_version())) .build() .unwrap(), ); diff --git a/narwhal/primary/src/tests/block_remover_tests.rs b/narwhal/primary/src/tests/block_remover_tests.rs index 7bdb8d55031a8b..8c517c2c118d3d 100644 --- a/narwhal/primary/src/tests/block_remover_tests.rs +++ b/narwhal/primary/src/tests/block_remover_tests.rs @@ -68,16 +68,8 @@ async fn test_successful_blocks_delete() { // AND generate headers with distributed batches between 2 workers (0 and 1) for _headers in 0..5 { - let batch_1 = test_utils::fixture_batch_with_transactions( - 10, - &latest_protocol_version(), - committee.epoch(), - ); - let batch_2 = test_utils::fixture_batch_with_transactions( - 10, - &latest_protocol_version(), - committee.epoch(), - ); + let batch_1 = test_utils::fixture_batch_with_transactions(10, &latest_protocol_version()); + let batch_2 = test_utils::fixture_batch_with_transactions(10, &latest_protocol_version()); let header = Header::V1( author @@ -245,16 +237,8 @@ async fn test_failed_blocks_delete() { // AND generate headers with distributed batches between 2 workers (0 and 1) for _headers in 0..5 { - let batch_1 = test_utils::fixture_batch_with_transactions( - 10, - &latest_protocol_version(), - committee.epoch(), - ); - let batch_2 = test_utils::fixture_batch_with_transactions( - 10, - &latest_protocol_version(), - committee.epoch(), - ); + let batch_1 = test_utils::fixture_batch_with_transactions(10, &latest_protocol_version()); + let batch_2 = test_utils::fixture_batch_with_transactions(10, &latest_protocol_version()); let header = Header::V1( author diff --git a/narwhal/primary/src/tests/block_waiter_tests.rs b/narwhal/primary/src/tests/block_waiter_tests.rs index 413f88733ea310..6277e000a84e27 100644 --- a/narwhal/primary/src/tests/block_waiter_tests.rs +++ b/narwhal/primary/src/tests/block_waiter_tests.rs @@ -33,11 +33,7 @@ async fn test_successfully_retrieve_block() { let header = Header::V1( author .header_builder(&committee) - .payload(fixture_payload( - 2, - &latest_protocol_version(), - committee.epoch(), - )) + .payload(fixture_payload(2, &latest_protocol_version())) .build() .unwrap(), ); @@ -58,7 +54,6 @@ async fn test_successfully_retrieve_block() { let expected_block_count = header.payload().len(); for (batch_digest, _) in header.payload() { let batch_digest_clone = *batch_digest; - let epoch = committee.epoch(); mock_server .expect_request_batch() .withf(move |request| request.body().batch == batch_digest_clone) @@ -67,7 +62,6 @@ async fn test_successfully_retrieve_block() { batch: Some(Batch::new( vec![vec![10u8, 5u8, 2u8], vec![8u8, 2u8, 3u8]], &latest_protocol_version(), - epoch, )), })) }); @@ -130,18 +124,14 @@ async fn test_successfully_retrieve_multiple_blocks() { // Batches to be used as "commons" between headers // Practically we want to test the case where different headers happen // to refer to batches with same id. - let common_batch_1 = - fixture_batch_with_transactions(10, &latest_protocol_version(), committee.epoch()); - let common_batch_2 = - fixture_batch_with_transactions(10, &latest_protocol_version(), committee.epoch()); + let common_batch_1 = fixture_batch_with_transactions(10, &latest_protocol_version()); + let common_batch_2 = fixture_batch_with_transactions(10, &latest_protocol_version()); for i in 0..10 { let mut builder = author.header_builder(&committee); - let batch_1 = - fixture_batch_with_transactions(10, &latest_protocol_version(), committee.epoch()); - let batch_2 = - fixture_batch_with_transactions(10, &latest_protocol_version(), committee.epoch()); + let batch_1 = fixture_batch_with_transactions(10, &latest_protocol_version()); + let batch_2 = fixture_batch_with_transactions(10, &latest_protocol_version()); builder = builder .with_payload_batch(batch_1.clone(), worker_id, 0) diff --git a/narwhal/primary/src/tests/certificate_fetcher_tests.rs b/narwhal/primary/src/tests/certificate_fetcher_tests.rs index d2bbda198d2662..99803b155f038d 100644 --- a/narwhal/primary/src/tests/certificate_fetcher_tests.rs +++ b/narwhal/primary/src/tests/certificate_fetcher_tests.rs @@ -239,12 +239,7 @@ async fn fetch_certificates_basic() { .into_iter() .map(|header| fixture.certificate(&header).digest()) .collect(); - (_, current_round) = fixture.headers_round( - i, - &parents, - &latest_protocol_version(), - fixture.committee().epoch(), - ); + (_, current_round) = fixture.headers_round(i, &parents, &latest_protocol_version()); headers.extend(current_round.clone()); } diff --git a/narwhal/primary/src/tests/primary_tests.rs b/narwhal/primary/src/tests/primary_tests.rs index 7cad6001c2fcd3..0c7cdf523b281e 100644 --- a/narwhal/primary/src/tests/primary_tests.rs +++ b/narwhal/primary/src/tests/primary_tests.rs @@ -385,7 +385,6 @@ async fn test_request_vote_has_missing_parents() { test_utils::fixture_batch_with_transactions( 10, &test_utils::latest_protocol_version(), - committee.epoch(), ), 0, 0, @@ -559,7 +558,6 @@ async fn test_request_vote_accept_missing_parents() { test_utils::fixture_batch_with_transactions( 10, &test_utils::latest_protocol_version(), - committee.epoch(), ), 0, 0, @@ -696,7 +694,6 @@ async fn test_request_vote_missing_batches() { test_utils::fixture_batch_with_transactions( 10, &test_utils::latest_protocol_version(), - fixture.committee().epoch(), ), 0, 0, @@ -723,7 +720,6 @@ async fn test_request_vote_missing_batches() { test_utils::fixture_batch_with_transactions( 10, &test_utils::latest_protocol_version(), - fixture.committee().epoch(), ), 1, 0, @@ -849,7 +845,6 @@ async fn test_request_vote_already_voted() { test_utils::fixture_batch_with_transactions( 10, &test_utils::latest_protocol_version(), - fixture.committee().epoch(), ), 0, 0, @@ -897,7 +892,6 @@ async fn test_request_vote_already_voted() { test_utils::fixture_batch_with_transactions( 10, &test_utils::latest_protocol_version(), - fixture.committee().epoch(), ), 1, 0, @@ -950,7 +944,6 @@ async fn test_request_vote_already_voted() { test_utils::fixture_batch_with_transactions( 10, &test_utils::latest_protocol_version(), - fixture.committee().epoch(), ), 1, 0, @@ -1045,12 +1038,8 @@ async fn test_fetch_certificates_handler() { .into_iter() .map(|header| fixture.certificate(&header).digest()) .collect(); - (_, current_round) = fixture.headers_round( - i, - &parents, - &test_utils::latest_protocol_version(), - fixture.committee().epoch(), - ); + (_, current_round) = + fixture.headers_round(i, &parents, &test_utils::latest_protocol_version()); headers.extend(current_round.clone()); } @@ -1224,7 +1213,6 @@ async fn test_process_payload_availability_success() { test_utils::fixture_batch_with_transactions( 10, &test_utils::latest_protocol_version(), - fixture.committee().epoch(), ), 0, 0, @@ -1389,7 +1377,6 @@ async fn test_process_payload_availability_when_failures() { test_utils::fixture_batch_with_transactions( 10, &test_utils::latest_protocol_version(), - fixture.committee().epoch(), ), 0, 0, @@ -1503,7 +1490,6 @@ async fn test_request_vote_created_at_in_future() { test_utils::fixture_batch_with_transactions( 10, &test_utils::latest_protocol_version(), - fixture.committee().epoch(), ), 0, 0, @@ -1555,7 +1541,6 @@ async fn test_request_vote_created_at_in_future() { test_utils::fixture_batch_with_transactions( 10, &test_utils::latest_protocol_version(), - fixture.committee().epoch(), ), 1, 0, @@ -1591,11 +1576,7 @@ async fn test_request_vote_created_at_in_future() { .round(2) .parents(certificates.keys().cloned().collect()) .with_payload_batch( - test_utils::fixture_batch_with_transactions( - 10, - &test_utils::latest_protocol_version(), - fixture.committee().epoch(), - ), + test_utils::fixture_batch_with_transactions(10, &test_utils::latest_protocol_version()), 1, 0, ) diff --git a/narwhal/primary/src/tests/proposer_tests.rs b/narwhal/primary/src/tests/proposer_tests.rs index 4731e6bcc52fd1..240e4dcb11e83e 100644 --- a/narwhal/primary/src/tests/proposer_tests.rs +++ b/narwhal/primary/src/tests/proposer_tests.rs @@ -121,11 +121,8 @@ async fn propose_payload_and_repropose_after_n_seconds() { assert!(header.validate(&committee, &worker_cache).is_ok()); // WHEN available batches are more than the maximum ones - let batches: IndexMap = fixture_payload( - (max_num_of_batches * 2) as u8, - &latest_protocol_version(), - committee.epoch(), - ); + let batches: IndexMap = + fixture_payload((max_num_of_batches * 2) as u8, &latest_protocol_version()); let mut ack_list = vec![]; for (batch_id, (worker_id, created_at)) in batches { diff --git a/narwhal/primary/src/tests/synchronizer_tests.rs b/narwhal/primary/src/tests/synchronizer_tests.rs index 1f19c0d5f126ee..d7dd56f8757ff2 100644 --- a/narwhal/primary/src/tests/synchronizer_tests.rs +++ b/narwhal/primary/src/tests/synchronizer_tests.rs @@ -892,11 +892,7 @@ async fn sync_batches_drops_old() { author .header_builder(&fixture.committee()) .with_payload_batch( - test_utils::fixture_batch_with_transactions( - 10, - &latest_protocol_version(), - fixture.committee().epoch(), - ), + test_utils::fixture_batch_with_transactions(10, &latest_protocol_version()), 0, 0, ) @@ -919,11 +915,7 @@ async fn sync_batches_drops_old() { .round(2) .parents(certificates.keys().cloned().collect()) .with_payload_batch( - test_utils::fixture_batch_with_transactions( - 10, - &latest_protocol_version(), - fixture.committee().epoch(), - ), + test_utils::fixture_batch_with_transactions(10, &latest_protocol_version()), 1, 0, ) diff --git a/narwhal/primary/tests/integration_tests_validator_api.rs b/narwhal/primary/tests/integration_tests_validator_api.rs index ba26e62df3f2ad..3d706f3691490b 100644 --- a/narwhal/primary/tests/integration_tests_validator_api.rs +++ b/narwhal/primary/tests/integration_tests_validator_api.rs @@ -59,8 +59,7 @@ async fn test_get_collections() { // Generate headers for n in 0..5 { - let batch = - fixture_batch_with_transactions(10, &latest_protocol_version(), committee.epoch()); + let batch = fixture_batch_with_transactions(10, &latest_protocol_version()); let header = Header::V1( author @@ -275,8 +274,7 @@ async fn test_remove_collections() { // Generate headers for n in 0..5 { - let batch = - fixture_batch_with_transactions(10, &latest_protocol_version(), committee.epoch()); + let batch = fixture_batch_with_transactions(10, &latest_protocol_version()); let header = Header::V1( author @@ -1206,7 +1204,7 @@ async fn fixture_certificate( payload_store: PayloadStore, batch_store: DBMap, ) -> (Certificate, Batch) { - let batch = fixture_batch_with_transactions(10, &latest_protocol_version(), committee.epoch()); + let batch = fixture_batch_with_transactions(10, &latest_protocol_version()); let worker_id = 0; let batch_digest = batch.digest(); diff --git a/narwhal/storage/src/certificate_store.rs b/narwhal/storage/src/certificate_store.rs index 4eaea5cecc88f4..a7c3d5fe615062 100644 --- a/narwhal/storage/src/certificate_store.rs +++ b/narwhal/storage/src/certificate_store.rs @@ -764,8 +764,7 @@ mod test { .iter() .map(|header| fixture.certificate(header).digest()) .collect(); - (_, current_round) = - fixture.headers_round(i, &parents, &latest_protocol_version(), committee.epoch()); + (_, current_round) = fixture.headers_round(i, &parents, &latest_protocol_version()); result.extend( current_round diff --git a/narwhal/storage/src/payload_store.rs b/narwhal/storage/src/payload_store.rs index 57d130350dbe99..19db5116fd151d 100644 --- a/narwhal/storage/src/payload_store.rs +++ b/narwhal/storage/src/payload_store.rs @@ -142,7 +142,7 @@ mod tests { // run the tests a few times let batch: Batch = - test_utils::fixture_batch_with_transactions(10, &latest_protocol_version(), 0); + test_utils::fixture_batch_with_transactions(10, &latest_protocol_version()); let id = batch.digest(); let worker_id = 0; diff --git a/narwhal/storage/src/proposer_store.rs b/narwhal/storage/src/proposer_store.rs index 26ce18a287c844..c90604c2398bcd 100644 --- a/narwhal/storage/src/proposer_store.rs +++ b/narwhal/storage/src/proposer_store.rs @@ -71,11 +71,7 @@ mod test { .epoch(fixture.committee().epoch()) .parents([CertificateDigest::default()].iter().cloned().collect()) .with_payload_batch( - fixture_batch_with_transactions( - 10, - &latest_protocol_version(), - fixture.committee().epoch(), - ), + fixture_batch_with_transactions(10, &latest_protocol_version()), 0, 0, ) diff --git a/narwhal/test-utils/src/cluster.rs b/narwhal/test-utils/src/cluster.rs index 4035af40213af5..7712ffd5708c7b 100644 --- a/narwhal/test-utils/src/cluster.rs +++ b/narwhal/test-utils/src/cluster.rs @@ -304,7 +304,6 @@ impl PrimaryNodeDetails { let registry_service = RegistryService::new(Registry::new()); let node = PrimaryNode::new( - latest_protocol_version(), parameters.clone(), internal_consensus_enabled, registry_service, @@ -365,6 +364,7 @@ impl PrimaryNodeDetails { self.key_pair.copy(), self.network_key_pair.copy(), self.committee.clone(), + latest_protocol_version(), self.worker_cache.clone(), client, &primary_store, diff --git a/narwhal/test-utils/src/lib.rs b/narwhal/test-utils/src/lib.rs index 1b29f3f44b08ed..b889e63ee020e5 100644 --- a/narwhal/test-utils/src/lib.rs +++ b/narwhal/test-utils/src/lib.rs @@ -140,12 +140,11 @@ pub fn random_key() -> KeyPair { pub fn fixture_payload( number_of_batches: u8, protocol_config: &ProtocolConfig, - epoch: Epoch, ) -> IndexMap { let mut payload: IndexMap = IndexMap::new(); for _ in 0..number_of_batches { - let batch_digest = batch(protocol_config, epoch).digest(); + let batch_digest = batch(protocol_config).digest(); payload.insert(batch_digest, (0, 0)); } @@ -158,25 +157,23 @@ pub fn fixture_payload( pub fn fixture_batch_with_transactions( number_of_transactions: u32, protocol_config: &ProtocolConfig, - epoch: Epoch, ) -> Batch { let transactions = (0..number_of_transactions) .map(|_v| transaction()) .collect(); - Batch::new(transactions, protocol_config, epoch) + Batch::new(transactions, protocol_config) } pub fn fixture_payload_with_rand( number_of_batches: u8, rand: &mut R, protocol_config: &ProtocolConfig, - epoch: Epoch, ) -> IndexMap { let mut payload: IndexMap = IndexMap::new(); for _ in 0..number_of_batches { - let batch_digest = batch_with_rand(rand, protocol_config, epoch).digest(); + let batch_digest = batch_with_rand(rand, protocol_config).digest(); payload.insert(batch_digest, (0, 0)); } @@ -191,15 +188,10 @@ pub fn transaction_with_rand(rand: &mut R) -> Transaction { .collect() } -pub fn batch_with_rand( - rand: &mut R, - protocol_config: &ProtocolConfig, - epoch: Epoch, -) -> Batch { +pub fn batch_with_rand(rand: &mut R, protocol_config: &ProtocolConfig) -> Batch { Batch::new( vec![transaction_with_rand(rand), transaction_with_rand(rand)], protocol_config, - epoch, ) } @@ -388,21 +380,17 @@ impl WorkerToWorker for WorkerToWorkerMockServer { //////////////////////////////////////////////////////////////// // Fixture -pub fn batch(protocol_config: &ProtocolConfig, epoch: Epoch) -> Batch { - Batch::new(vec![transaction(), transaction()], protocol_config, epoch) +pub fn batch(protocol_config: &ProtocolConfig) -> Batch { + Batch::new(vec![transaction(), transaction()], protocol_config) } /// generate multiple fixture batches. The number of generated batches /// are dictated by the parameter num_of_batches. -pub fn batches( - num_of_batches: usize, - protocol_config: &ProtocolConfig, - epoch: Epoch, -) -> Vec { +pub fn batches(num_of_batches: usize, protocol_config: &ProtocolConfig) -> Vec { let mut batches = Vec::new(); for i in 1..num_of_batches + 1 { - batches.push(batch_with_transactions(i, protocol_config, epoch)); + batches.push(batch_with_transactions(i, protocol_config)); } batches @@ -411,7 +399,6 @@ pub fn batches( pub fn batch_with_transactions( num_of_transactions: usize, protocol_config: &ProtocolConfig, - epoch: Epoch, ) -> Batch { let mut transactions = Vec::new(); @@ -419,7 +406,7 @@ pub fn batch_with_transactions( transactions.push(transaction()); } - Batch::new(transactions, protocol_config, epoch) + Batch::new(transactions, protocol_config) } const BATCHES_CF: &str = "batches"; @@ -714,12 +701,7 @@ pub fn mock_certificate_with_rand( .round(round) .epoch(0) .parents(parents) - .payload(fixture_payload_with_rand( - 1, - rand, - protocol_config, - committee.epoch(), - )) + .payload(fixture_payload_with_rand(1, rand, protocol_config)) .build() .unwrap(); let certificate = Certificate::new_unsigned(committee, Header::V1(header), Vec::new()).unwrap(); @@ -754,7 +736,7 @@ pub fn mock_certificate_with_epoch( .round(round) .epoch(epoch) .parents(parents) - .payload(fixture_payload(1, protocol_config, epoch)) + .payload(fixture_payload(1, protocol_config)) .build() .unwrap(); let certificate = Certificate::new_unsigned(committee, Header::V1(header), Vec::new()).unwrap(); @@ -772,7 +754,7 @@ pub fn mock_signed_certificate( ) -> (CertificateDigest, Certificate) { let header_builder = HeaderV1Builder::default() .author(origin) - .payload(fixture_payload(1, protocol_config, committee.epoch())) + .payload(fixture_payload(1, protocol_config)) .round(round) .epoch(0) .parents(parents); @@ -987,7 +969,6 @@ impl CommitteeFixture { prior_round: Round, parents: &BTreeSet, protocol_config: &ProtocolConfig, - epoch: Epoch, ) -> (Round, Vec
) { let round = prior_round + 1; let next_headers = self @@ -1000,11 +981,7 @@ impl CommitteeFixture { .round(round) .epoch(0) .parents(parents.clone()) - .with_payload_batch( - fixture_batch_with_transactions(10, protocol_config, epoch), - 0, - 0, - ) + .with_payload_batch(fixture_batch_with_transactions(10, protocol_config), 0, 0) .build() .unwrap(); Header::V1(header) diff --git a/narwhal/types/benches/batch_digest.rs b/narwhal/types/benches/batch_digest.rs index 57e3dab124b851..ce2149afb4b472 100644 --- a/narwhal/types/benches/batch_digest.rs +++ b/narwhal/types/benches/batch_digest.rs @@ -24,7 +24,6 @@ pub fn batch_digest(c: &mut Criterion) { let batch = Batch::new( (0..size).map(|_| tx_gen()).collect::>(), &latest_protocol_version(), - 0, ); digest_group.throughput(Throughput::Bytes(512 * size as u64)); digest_group.bench_with_input(BenchmarkId::new("batch digest", size), &batch, |b, i| { diff --git a/narwhal/types/src/primary.rs b/narwhal/types/src/primary.rs index f331e7a3db83e1..a06a77f1070b61 100644 --- a/narwhal/types/src/primary.rs +++ b/narwhal/types/src/primary.rs @@ -148,14 +148,10 @@ pub enum Batch { } impl Batch { - pub fn new( - transactions: Vec, - protocol_config: &ProtocolConfig, - epoch: Epoch, - ) -> Self { + pub fn new(transactions: Vec, protocol_config: &ProtocolConfig) -> Self { // TODO: Remove once we have upgraded to protocol version 12. if protocol_config.narwhal_versioned_metadata() { - Self::V2(BatchV2::new(transactions, protocol_config, epoch)) + Self::V2(BatchV2::new(transactions, protocol_config)) } else { Self::V1(BatchV1::new(transactions)) } @@ -190,7 +186,6 @@ pub trait BatchAPI { // BatchV2 APIs fn versioned_metadata(&self) -> &VersionedMetadata; fn versioned_metadata_mut(&mut self) -> &mut VersionedMetadata; - fn epoch(&self) -> &Epoch; } pub type Transaction = Vec; @@ -224,10 +219,6 @@ impl BatchAPI for BatchV1 { fn versioned_metadata_mut(&mut self) -> &mut VersionedMetadata { unimplemented!("BatchV1 does not have a VersionedMetadata field"); } - - fn epoch(&self) -> &Epoch { - unimplemented!("BatchV1 does not have an Epoch field"); - } } impl BatchV1 { @@ -245,7 +236,6 @@ impl BatchV1 { #[derive(Clone, Serialize, Deserialize, Debug, PartialEq, Eq, Arbitrary)] pub struct BatchV2 { - pub epoch: Epoch, pub transactions: Vec, // This field is not included as part of the batch digest pub versioned_metadata: VersionedMetadata, @@ -275,22 +265,13 @@ impl BatchAPI for BatchV2 { fn versioned_metadata_mut(&mut self) -> &mut VersionedMetadata { &mut self.versioned_metadata } - - fn epoch(&self) -> &Epoch { - &self.epoch - } } impl BatchV2 { - pub fn new( - transactions: Vec, - protocol_config: &ProtocolConfig, - epoch: Epoch, - ) -> Self { + pub fn new(transactions: Vec, protocol_config: &ProtocolConfig) -> Self { Self { transactions, versioned_metadata: VersionedMetadata::new(protocol_config), - epoch, } } @@ -386,12 +367,9 @@ impl Hash<{ crypto::DIGEST_LENGTH }> for BatchV2 { type TypedDigest = BatchDigest; fn digest(&self) -> Self::TypedDigest { - let mut hasher = crypto::DefaultHashFunction::new(); - hasher.update(self.epoch.to_be_bytes()); - hasher.update(crypto::DefaultHashFunction::digest_iterator( - self.transactions.iter(), - )); - BatchDigest::new(hasher.finalize().into()) + BatchDigest::new( + crypto::DefaultHashFunction::digest_iterator(self.transactions.iter()).into(), + ) } } @@ -1620,36 +1598,11 @@ mod tests { Batch, BatchAPI, BatchV1, BatchV2, Metadata, MetadataAPI, MetadataV1, Timestamp, VersionedMetadata, }; - use fastcrypto::hash::Hash; use std::time::Duration; use sui_protocol_config::{ProtocolConfig, ProtocolVersion}; use test_utils::latest_protocol_version; use tokio::time::sleep; - #[tokio::test] - async fn test_batch_digests() { - // TODO: Remove once we have upgraded to protocol version 12. - // BatchV1 - let batchv1_1 = Batch::new( - vec![], - &ProtocolConfig::get_for_version(ProtocolVersion::new(11)), - 0, - ); - let batchv1_2 = Batch::new( - vec![], - &ProtocolConfig::get_for_version(ProtocolVersion::new(11)), - 1, - ); - - assert_eq!(batchv1_1.digest(), batchv1_2.digest()); - - // BatchV2 - let batchv2_1 = Batch::new(vec![], &latest_protocol_version(), 0); - let batchv2_2 = Batch::new(vec![], &latest_protocol_version(), 1); - - assert_ne!(batchv2_1.digest(), batchv2_2.digest()); - } - #[tokio::test] async fn test_elapsed() { // TODO: Remove once we have upgraded to protocol version 12. @@ -1657,7 +1610,6 @@ mod tests { let batch = Batch::new( vec![], &ProtocolConfig::get_for_version(ProtocolVersion::new(11)), - 0, ); assert!(batch.metadata().created_at > 0); @@ -1666,7 +1618,7 @@ mod tests { assert!(batch.metadata().created_at.elapsed().as_secs_f64() >= 2.0); // BatchV2 - let batch = Batch::new(vec![], &latest_protocol_version(), 0); + let batch = Batch::new(vec![], &latest_protocol_version()); assert!(*batch.versioned_metadata().created_at() > 0); @@ -1704,7 +1656,6 @@ mod tests { created_at: 2999309726980, // something in the future - Fri Jan 16 2065 05:35:26 received_at: None, }), - epoch: 0, }); assert_eq!( diff --git a/narwhal/worker/src/batch_fetcher.rs b/narwhal/worker/src/batch_fetcher.rs index c419c61e1e8bca..fc0ebdfcc1f9b8 100644 --- a/narwhal/worker/src/batch_fetcher.rs +++ b/narwhal/worker/src/batch_fetcher.rs @@ -361,8 +361,8 @@ mod tests { let mut network = TestRequestBatchesNetwork::new(); let batch_store = test_utils::create_batch_store(); let v11_protocol_config = get_protocol_config(11); - let batchv1_1 = Batch::new(vec![vec![1]], &v11_protocol_config, 0); - let batchv1_2 = Batch::new(vec![vec![2]], &v11_protocol_config, 0); + let batchv1_1 = Batch::new(vec![vec![1]], &v11_protocol_config); + let batchv1_2 = Batch::new(vec![vec![2]], &v11_protocol_config); let (digests, known_workers) = ( HashSet::from_iter(vec![batchv1_1.digest(), batchv1_2.digest()]), HashSet::from_iter(test_pks(&[1, 2])), @@ -409,8 +409,8 @@ mod tests { let batch_store = test_utils::create_batch_store(); let latest_protocol_config = latest_protocol_version(); let v11_protocol_config = &get_protocol_config(11); - let batchv1_1 = Batch::new(vec![vec![1]], v11_protocol_config, 0); - let batchv1_2 = Batch::new(vec![vec![2]], v11_protocol_config, 0); + let batchv1_1 = Batch::new(vec![vec![1]], v11_protocol_config); + let batchv1_2 = Batch::new(vec![vec![2]], v11_protocol_config); let (digests, known_workers) = ( HashSet::from_iter(vec![batchv1_1.digest(), batchv1_2.digest()]), HashSet::from_iter(test_pks(&[1, 2])), @@ -441,8 +441,8 @@ mod tests { let batch_store = test_utils::create_batch_store(); let latest_protocol_config = &latest_protocol_version(); let v11_protocol_config = get_protocol_config(11); - let batchv2_1 = Batch::new(vec![vec![1]], latest_protocol_config, 0); - let batchv2_2 = Batch::new(vec![vec![2]], latest_protocol_config, 0); + let batchv2_1 = Batch::new(vec![vec![1]], latest_protocol_config); + let batchv2_2 = Batch::new(vec![vec![2]], latest_protocol_config); let (digests, known_workers) = ( HashSet::from_iter(vec![batchv2_1.digest(), batchv2_2.digest()]), HashSet::from_iter(test_pks(&[1, 2])), @@ -471,8 +471,8 @@ mod tests { let mut network = TestRequestBatchesNetwork::new(); let batch_store = test_utils::create_batch_store(); let latest_protocol_config = &latest_protocol_version(); - let batchv2_1 = Batch::new(vec![vec![1]], latest_protocol_config, 0); - let batchv2_2 = Batch::new(vec![vec![2]], latest_protocol_config, 0); + let batchv2_1 = Batch::new(vec![vec![1]], latest_protocol_config); + let batchv2_2 = Batch::new(vec![vec![2]], latest_protocol_config); let (digests, known_workers) = ( HashSet::from_iter(vec![batchv2_1.digest(), batchv2_2.digest()]), HashSet::from_iter(test_pks(&[1, 2])), @@ -523,8 +523,8 @@ mod tests { pub async fn test_fetcher() { let mut network = TestRequestBatchesNetwork::new(); let batch_store = test_utils::create_batch_store(); - let batch1 = Batch::new(vec![vec![1]], &latest_protocol_version(), 0); - let batch2 = Batch::new(vec![vec![2]], &latest_protocol_version(), 0); + let batch1 = Batch::new(vec![vec![1]], &latest_protocol_version()); + let batch2 = Batch::new(vec![vec![2]], &latest_protocol_version()); let (digests, known_workers) = ( HashSet::from_iter(vec![batch1.digest(), batch2.digest()]), HashSet::from_iter(test_pks(&[1, 2])), @@ -569,9 +569,9 @@ mod tests { // and ensure another request is sent to get the remaining batches. let mut network = TestRequestBatchesNetwork::new(); let batch_store = test_utils::create_batch_store(); - let batch1 = Batch::new(vec![vec![1]], &latest_protocol_version(), 0); - let batch2 = Batch::new(vec![vec![2]], &latest_protocol_version(), 0); - let batch3 = Batch::new(vec![vec![3]], &latest_protocol_version(), 0); + let batch1 = Batch::new(vec![vec![1]], &latest_protocol_version()); + let batch2 = Batch::new(vec![vec![2]], &latest_protocol_version()); + let batch3 = Batch::new(vec![vec![3]], &latest_protocol_version()); let (digests, known_workers) = ( HashSet::from_iter(vec![batch1.digest(), batch2.digest(), batch3.digest()]), HashSet::from_iter(test_pks(&[1, 2, 3])), @@ -604,9 +604,9 @@ mod tests { // and ensure another request is sent to get the remaining batches. let mut network = TestRequestBatchesNetwork::new(); let batch_store = test_utils::create_batch_store(); - let batch1 = Batch::new(vec![vec![1]], &latest_protocol_version(), 0); - let batch2 = Batch::new(vec![vec![2]], &latest_protocol_version(), 0); - let batch3 = Batch::new(vec![vec![3]], &latest_protocol_version(), 0); + let batch1 = Batch::new(vec![vec![1]], &latest_protocol_version()); + let batch2 = Batch::new(vec![vec![2]], &latest_protocol_version()); + let batch3 = Batch::new(vec![vec![3]], &latest_protocol_version()); let (digests, known_workers) = ( HashSet::from_iter(vec![batch1.digest(), batch2.digest(), batch3.digest()]), HashSet::from_iter(test_pks(&[2, 3, 4])), @@ -645,9 +645,9 @@ mod tests { pub async fn test_fetcher_local_and_remote() { let mut network = TestRequestBatchesNetwork::new(); let batch_store = test_utils::create_batch_store(); - let batch1 = Batch::new(vec![vec![1]], &latest_protocol_version(), 0); - let batch2 = Batch::new(vec![vec![2]], &latest_protocol_version(), 0); - let batch3 = Batch::new(vec![vec![3]], &latest_protocol_version(), 0); + let batch1 = Batch::new(vec![vec![1]], &latest_protocol_version()); + let batch2 = Batch::new(vec![vec![2]], &latest_protocol_version()); + let batch3 = Batch::new(vec![vec![3]], &latest_protocol_version()); let (digests, known_workers) = ( HashSet::from_iter(vec![batch1.digest(), batch2.digest(), batch3.digest()]), HashSet::from_iter(test_pks(&[1, 2, 3, 4])), @@ -696,7 +696,7 @@ mod tests { let mut local_digests = Vec::new(); // 6 batches available locally with response size limit of 2 for i in 0..num_digests / 2 { - let batch = Batch::new(vec![vec![i]], &latest_protocol_version(), 0); + let batch = Batch::new(vec![vec![i]], &latest_protocol_version()); local_digests.push(batch.digest()); batch_store.insert(&batch.digest(), &batch).unwrap(); network.put(&[1, 2, 3], batch.clone()); @@ -704,7 +704,7 @@ mod tests { } // 6 batches available remotely with response size limit of 2 for i in (num_digests / 2)..num_digests { - let batch = Batch::new(vec![vec![i]], &latest_protocol_version(), 0); + let batch = Batch::new(vec![vec![i]], &latest_protocol_version()); network.put(&[1, 2, 3], batch.clone()); expected_batches.push(batch); } diff --git a/narwhal/worker/src/batch_maker.rs b/narwhal/worker/src/batch_maker.rs index dc70be40daf4ff..f40eae0021117e 100644 --- a/narwhal/worker/src/batch_maker.rs +++ b/narwhal/worker/src/batch_maker.rs @@ -3,7 +3,7 @@ // SPDX-License-Identifier: Apache-2.0 use crate::metrics::WorkerMetrics; -use config::{Epoch, WorkerId}; +use config::WorkerId; use fastcrypto::hash::Hash; use futures::future::BoxFuture; use futures::stream::FuturesUnordered; @@ -60,7 +60,6 @@ pub struct BatchMaker { /// The batch store to store our own batches. store: DBMap, protocol_config: ProtocolConfig, - epoch: Epoch, } impl BatchMaker { @@ -76,7 +75,6 @@ impl BatchMaker { client: NetworkClient, store: DBMap, protocol_config: ProtocolConfig, - epoch: Epoch, ) -> JoinHandle<()> { spawn_logged_monitored_task!( async move { @@ -92,7 +90,6 @@ impl BatchMaker { client, store, protocol_config, - epoch, } .run() .await; @@ -106,7 +103,7 @@ impl BatchMaker { let timer = sleep(self.max_batch_delay); tokio::pin!(timer); - let mut current_batch = Batch::new(vec![], &self.protocol_config, self.epoch); + let mut current_batch = Batch::new(vec![], &self.protocol_config); let mut current_responses = Vec::new(); let mut current_batch_size = 0; @@ -129,7 +126,7 @@ impl BatchMaker { } self.node_metrics.parallel_worker_batches.set(batch_pipeline.len() as i64); - current_batch = Batch::new(vec![], &self.protocol_config, self.epoch); + current_batch = Batch::new(vec![], &self.protocol_config); current_responses = Vec::new(); current_batch_size = 0; @@ -150,7 +147,7 @@ impl BatchMaker { } self.node_metrics.parallel_worker_batches.set(batch_pipeline.len() as i64); - current_batch = Batch::new(vec![], &self.protocol_config, self.epoch); + current_batch = Batch::new(vec![], &self.protocol_config); current_responses = Vec::new(); current_batch_size = 0; } diff --git a/narwhal/worker/src/tests/batch_maker_tests.rs b/narwhal/worker/src/tests/batch_maker_tests.rs index 3c1f14ed0dd043..0d2aeaac24898b 100644 --- a/narwhal/worker/src/tests/batch_maker_tests.rs +++ b/narwhal/worker/src/tests/batch_maker_tests.rs @@ -43,7 +43,6 @@ async fn make_batch() { client, store.clone(), latest_protocol_version(), - 0, ); // Send enough transactions to seal a batch. @@ -54,7 +53,7 @@ async fn make_batch() { tx_batch_maker.send((tx.clone(), s1)).await.unwrap(); // Ensure the batch is as expected. - let expected_batch = Batch::new(vec![tx.clone(), tx.clone()], &latest_protocol_version(), 0); + let expected_batch = Batch::new(vec![tx.clone(), tx.clone()], &latest_protocol_version()); let (batch, resp) = rx_quorum_waiter.recv().await.unwrap(); assert_eq!(batch.transactions(), expected_batch.transactions()); @@ -100,7 +99,6 @@ async fn batch_timeout() { client, store.clone(), latest_protocol_version(), - 0, ); // Do not send enough transactions to seal a batch. @@ -110,7 +108,7 @@ async fn batch_timeout() { // Ensure the batch is as expected. let (batch, resp) = rx_quorum_waiter.recv().await.unwrap(); - let expected_batch = Batch::new(vec![tx.clone()], &latest_protocol_version(), 0); + let expected_batch = Batch::new(vec![tx.clone()], &latest_protocol_version()); assert_eq!(batch.transactions(), expected_batch.transactions()); // Eventually deliver message diff --git a/narwhal/worker/src/tests/handlers_tests.rs b/narwhal/worker/src/tests/handlers_tests.rs index 6b27fae876ac77..ad2ebeb230acd4 100644 --- a/narwhal/worker/src/tests/handlers_tests.rs +++ b/narwhal/worker/src/tests/handlers_tests.rs @@ -26,7 +26,7 @@ async fn synchronize() { // Create network with mock behavior to respond to RequestBatches request. let target_primary = fixture.authorities().nth(1).unwrap(); - let batch = test_utils::batch(&latest_protocol_config, committee.epoch()); + let batch = test_utils::batch(&latest_protocol_config); let digest = batch.digest(); let message = WorkerSynchronizeMessage { digests: vec![digest], @@ -105,7 +105,7 @@ async fn synchronize_versioned_batches() { // Create network with mock behavior to respond to RequestBatch request. let target_primary = fixture.authorities().nth(1).unwrap(); - let batch_v1 = test_utils::batch(protocol_config_v11, committee.epoch()); + let batch_v1 = test_utils::batch(protocol_config_v11); let digest_v1 = batch_v1.digest(); let message_v1 = WorkerSynchronizeMessage { digests: vec![digest_v1], @@ -113,7 +113,7 @@ async fn synchronize_versioned_batches() { is_certified: false, }; - let batch_v2 = test_utils::batch(latest_protocol_config, committee.epoch()); + let batch_v2 = test_utils::batch(latest_protocol_config); let digest_v2 = batch_v2.digest(); let message_v2 = WorkerSynchronizeMessage { digests: vec![digest_v2], @@ -245,7 +245,7 @@ async fn synchronize_when_batch_exists() { }; // Store the batch. - let batch = test_utils::batch(&latest_protocol_version(), committee.epoch()); + let batch = test_utils::batch(&latest_protocol_version()); let batch_id = batch.digest(); let missing = vec![batch_id]; store.insert(&batch_id, &batch).unwrap(); @@ -276,7 +276,7 @@ async fn delete_batches() { // Create a new test store. let store = test_utils::create_batch_store(); - let batch = test_utils::batch(&latest_protocol_version(), committee.epoch()); + let batch = test_utils::batch(&latest_protocol_version()); let digest = batch.digest(); store.insert(&digest, &batch).unwrap(); diff --git a/narwhal/worker/src/tests/quorum_waiter_tests.rs b/narwhal/worker/src/tests/quorum_waiter_tests.rs index f202cf71329c5d..64cdfeab570c9f 100644 --- a/narwhal/worker/src/tests/quorum_waiter_tests.rs +++ b/narwhal/worker/src/tests/quorum_waiter_tests.rs @@ -33,7 +33,7 @@ async fn wait_for_quorum() { ); // Make a batch. - let batch = batch(&latest_protocol_version(), committee.epoch()); + let batch = batch(&latest_protocol_version()); let message = WorkerBatchMessage { batch: batch.clone(), }; @@ -90,7 +90,7 @@ async fn pipeline_for_quorum() { ); // Make a batch. - let batch = batch(&latest_protocol_version(), committee.epoch()); + let batch = batch(&latest_protocol_version()); let message = WorkerBatchMessage { batch: batch.clone(), }; diff --git a/narwhal/worker/src/tests/worker_tests.rs b/narwhal/worker/src/tests/worker_tests.rs index d8635fe3554816..712ab4051a1ba8 100644 --- a/narwhal/worker/src/tests/worker_tests.rs +++ b/narwhal/worker/src/tests/worker_tests.rs @@ -118,7 +118,7 @@ async fn reject_invalid_clients_transactions() { let worker_pk = worker_cache.worker(&public_key, &worker_id).unwrap().name; - let batch = batch(&latest_protocol_version(), committee.epoch()); + let batch = batch(&latest_protocol_version()); let batch_message = WorkerBatchMessage { batch: batch.clone(), }; @@ -197,7 +197,7 @@ async fn handle_remote_clients_transactions() { let mut peer_networks = Vec::new(); // Create batches - let batch = batch(&latest_protocol_version(), committee.epoch()); + let batch = batch(&latest_protocol_version()); let batch_digest = batch.digest(); let (tx_await_batch, mut rx_await_batch) = test_utils::test_channel!(CHANNEL_CAPACITY); @@ -316,7 +316,7 @@ async fn handle_local_clients_transactions() { let mut peer_networks = Vec::new(); // Create batches - let batch = batch(&latest_protocol_version(), committee.epoch()); + let batch = batch(&latest_protocol_version()); let batch_digest = batch.digest(); let (tx_await_batch, mut rx_await_batch) = test_utils::test_channel!(CHANNEL_CAPACITY); diff --git a/narwhal/worker/src/worker.rs b/narwhal/worker/src/worker.rs index dcea35826b80a8..dddeed7945df24 100644 --- a/narwhal/worker/src/worker.rs +++ b/narwhal/worker/src/worker.rs @@ -494,7 +494,6 @@ impl Worker { client, self.store.clone(), self.protocol_config.clone(), - self.committee.epoch(), ); // The `QuorumWaiter` waits for 2f authorities to acknowledge reception of the batch. It then forwards