From 4c442e160a205088a5b0e5323fe049df3acc9a45 Mon Sep 17 00:00:00 2001 From: Arun Koshy <97870774+arun-koshy@users.noreply.github.com> Date: Wed, 24 May 2023 16:59:06 -0700 Subject: [PATCH] Address review comments --- crates/sui-core/src/consensus_validator.rs | 1 + crates/sui-core/src/narwhal_manager/mod.rs | 6 +- .../src/unit_tests/narwhal_manager_tests.rs | 2 +- crates/sui-node/src/lib.rs | 6 +- crates/sui-protocol-config/src/lib.rs | 2 +- .../consensus/src/tests/bullshark_tests.rs | 66 +++++------ .../consensus/src/tests/consensus_tests.rs | 2 +- narwhal/consensus/src/tests/dag_tests.rs | 20 ++-- .../consensus/src/tests/randomized_tests.rs | 2 +- narwhal/consensus/src/tests/tusk_tests.rs | 14 +-- narwhal/executor/src/lib.rs | 4 +- narwhal/executor/src/metrics.rs | 6 +- narwhal/executor/src/subscriber.rs | 14 +-- .../tests/consensus_integration_tests.rs | 6 +- narwhal/network/src/client.rs | 9 +- narwhal/network/src/traits.rs | 7 +- narwhal/node/src/generate_format.rs | 4 +- narwhal/node/src/main.rs | 4 +- narwhal/node/src/primary_node.rs | 20 ++-- narwhal/node/src/worker_node.rs | 17 ++- narwhal/node/tests/node_test.rs | 6 +- narwhal/node/tests/staged/narwhal.yaml | 3 +- narwhal/primary/src/primary.rs | 9 +- narwhal/primary/src/tests/primary_tests.rs | 10 +- .../primary/src/tests/synchronizer_tests.rs | 12 +- .../tests/integration_tests_proposer_api.rs | 12 +- .../tests/integration_tests_validator_api.rs | 28 ++--- narwhal/test-utils/src/cluster.rs | 12 +- narwhal/test-utils/src/lib.rs | 30 ++--- narwhal/types/benches/verify_certificate.rs | 2 +- narwhal/types/build.rs | 7 +- narwhal/types/src/primary.rs | 10 +- narwhal/worker/src/batch_fetcher.rs | 111 +++++++++--------- narwhal/worker/src/batch_maker.rs | 6 +- narwhal/worker/src/handlers.rs | 5 +- narwhal/worker/src/tests/batch_maker_tests.rs | 4 +- narwhal/worker/src/tests/handlers_tests.rs | 10 +- narwhal/worker/src/tests/worker_tests.rs | 18 +-- narwhal/worker/src/worker.rs | 14 +-- 39 files changed, 259 insertions(+), 262 deletions(-) diff --git a/crates/sui-core/src/consensus_validator.rs b/crates/sui-core/src/consensus_validator.rs index a75fea605e25dd..872c55aac1a471 100644 --- a/crates/sui-core/src/consensus_validator.rs +++ b/crates/sui-core/src/consensus_validator.rs @@ -157,6 +157,7 @@ mod tests { consensus_adapter::consensus_tests::{test_certificates, test_gas_objects}, consensus_validator::{SuiTxValidator, SuiTxValidatorMetrics}, }; + use narwhal_test_utils::{get_protocol_config, latest_protocol_version}; use narwhal_types::Batch; use narwhal_worker::TransactionValidator; diff --git a/crates/sui-core/src/narwhal_manager/mod.rs b/crates/sui-core/src/narwhal_manager/mod.rs index 38e6b85c32b338..2286a7298e5cc1 100644 --- a/crates/sui-core/src/narwhal_manager/mod.rs +++ b/crates/sui-core/src/narwhal_manager/mod.rs @@ -90,23 +90,23 @@ pub struct NarwhalManager { impl NarwhalManager { pub fn new( + protocol_config: ProtocolConfig, config: NarwhalConfiguration, metrics: NarwhalManagerMetrics, - protocol_config: ProtocolConfig, ) -> Self { // Create the Narwhal Primary with configuration let primary_node = PrimaryNode::new( + protocol_config.clone(), config.parameters.clone(), true, config.registry_service.clone(), - protocol_config.clone(), ); // Create Narwhal Workers with configuration let worker_nodes = WorkerNodes::new( config.registry_service.clone(), - config.parameters.clone(), protocol_config, + config.parameters.clone(), ); let store_cache_metrics = diff --git a/crates/sui-core/src/unit_tests/narwhal_manager_tests.rs b/crates/sui-core/src/unit_tests/narwhal_manager_tests.rs index 79afd84fce9b91..c253d14247b047 100644 --- a/crates/sui-core/src/unit_tests/narwhal_manager_tests.rs +++ b/crates/sui-core/src/unit_tests/narwhal_manager_tests.rs @@ -128,7 +128,7 @@ async fn test_narwhal_manager() { let metrics = NarwhalManagerMetrics::new(&Registry::new()); let narwhal_manager = - NarwhalManager::new(narwhal_config, metrics, latest_protocol_version()); + NarwhalManager::new(latest_protocol_version(), narwhal_config, metrics); // start narwhal narwhal_manager diff --git a/crates/sui-node/src/lib.rs b/crates/sui-node/src/lib.rs index ff51ffd05fdde6..6e7d095657f0a5 100644 --- a/crates/sui-node/src/lib.rs +++ b/crates/sui-node/src/lib.rs @@ -660,10 +660,10 @@ impl SuiNode { ®istry_service.default_registry(), )); let narwhal_manager = Self::construct_narwhal_manager( + epoch_store.protocol_config().clone(), config, consensus_config, registry_service, - epoch_store.protocol_config().clone(), )?; let mut narwhal_epoch_data_remover = @@ -829,10 +829,10 @@ impl SuiNode { } fn construct_narwhal_manager( + protocol_config: ProtocolConfig, config: &NodeConfig, consensus_config: &ConsensusConfig, registry_service: &RegistryService, - protocol_config: ProtocolConfig, ) -> Result { let narwhal_config = NarwhalConfiguration { primary_keypair: config.protocol_key_pair().copy(), @@ -846,9 +846,9 @@ impl SuiNode { let metrics = NarwhalManagerMetrics::new(®istry_service.default_registry()); Ok(NarwhalManager::new( + protocol_config, narwhal_config, metrics, - protocol_config, )) } diff --git a/crates/sui-protocol-config/src/lib.rs b/crates/sui-protocol-config/src/lib.rs index d69af73e9ae057..01d164fbbd0391 100644 --- a/crates/sui-protocol-config/src/lib.rs +++ b/crates/sui-protocol-config/src/lib.rs @@ -38,7 +38,7 @@ const MAX_PROTOCOL_VERSION: u64 = 12; // framework changes. // Version 11: Introduce `std::type_name::get_with_original_ids` to the system frameworks. // Version 12: Changes to deepbook in framework to add API for querying marketplace. -// Change NW entities to use versioned metadata field. +// Change NW Batch to use versioned metadata field. #[derive(Copy, Clone, Debug, Hash, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)] pub struct ProtocolVersion(u64); diff --git a/narwhal/consensus/src/tests/bullshark_tests.rs b/narwhal/consensus/src/tests/bullshark_tests.rs index 61deb55a9dbc60..48ee5df7b0502b 100644 --- a/narwhal/consensus/src/tests/bullshark_tests.rs +++ b/narwhal/consensus/src/tests/bullshark_tests.rs @@ -35,27 +35,27 @@ async fn commit_one() { .collect::>(); let (mut certificates, next_parents) = test_utils::make_optimal_certificates( &committee, + &latest_protocol_version(), 1..=2, &genesis, &ids, - &latest_protocol_version(), ); // Make two certificate (f+1) with round 3 to trigger the commits. let (_, certificate) = test_utils::mock_certificate( &committee, + &latest_protocol_version(), ids[0], 3, next_parents.clone(), - &latest_protocol_version(), ); certificates.push_back(certificate); let (_, certificate) = test_utils::mock_certificate( &committee, + &latest_protocol_version(), ids[1], 3, next_parents, - &latest_protocol_version(), ); certificates.push_back(certificate); @@ -138,10 +138,10 @@ async fn dead_node() { let (mut certificates, _) = test_utils::make_optimal_certificates( &committee, + &latest_protocol_version(), 1..=11, &genesis, &ids, - &latest_protocol_version(), ); // Spawn the consensus engine and sink the primary channel. @@ -251,10 +251,10 @@ async fn not_enough_support() { let nodes: Vec<_> = ids.iter().take(3).cloned().collect(); let (out, parents) = test_utils::make_optimal_certificates( &committee, + &latest_protocol_version(), 1..=1, &genesis, &nodes, - &latest_protocol_version(), ); certificates.extend(out); @@ -262,20 +262,20 @@ async fn not_enough_support() { // round is the only one with 4 certificates. let (leader_2_digest, certificate) = test_utils::mock_certificate( &committee, + &latest_protocol_version(), ids[0], 2, parents.clone(), - &latest_protocol_version(), ); certificates.push_back(certificate); let nodes: Vec<_> = ids.iter().skip(1).cloned().collect(); let (out, mut parents) = test_utils::make_optimal_certificates( &committee, + &latest_protocol_version(), 2..=2, &parents, &nodes, - &latest_protocol_version(), ); certificates.extend(out); @@ -285,10 +285,10 @@ async fn not_enough_support() { let name = ids[1]; let (digest, certificate) = test_utils::mock_certificate( &committee, + &latest_protocol_version(), name, 3, parents.clone(), - &latest_protocol_version(), ); certificates.push_back(certificate); next_parents.insert(digest); @@ -296,10 +296,10 @@ async fn not_enough_support() { let name = ids[2]; let (digest, certificate) = test_utils::mock_certificate( &committee, + &latest_protocol_version(), name, 3, parents.clone(), - &latest_protocol_version(), ); certificates.push_back(certificate); next_parents.insert(digest); @@ -308,10 +308,10 @@ async fn not_enough_support() { parents.insert(leader_2_digest); let (digest, certificate) = test_utils::mock_certificate( &committee, + &latest_protocol_version(), name, 3, parents.clone(), - &latest_protocol_version(), ); certificates.push_back(certificate); next_parents.insert(digest); @@ -322,24 +322,24 @@ async fn not_enough_support() { let nodes: Vec<_> = ids.to_vec(); let (out, parents) = test_utils::make_optimal_certificates( &committee, + &latest_protocol_version(), 4..=4, &parents, &nodes, - &latest_protocol_version(), ); certificates.extend(out); // Round 5: Send f+1 certificates to trigger the commit of leader 4. let (_, certificate) = test_utils::mock_certificate( &committee, + &latest_protocol_version(), ids[0], 5, parents.clone(), - &latest_protocol_version(), ); certificates.push_back(certificate); let (_, certificate) = - test_utils::mock_certificate(&committee, ids[1], 5, parents, &latest_protocol_version()); + test_utils::mock_certificate(&committee, &latest_protocol_version(), ids[1], 5, parents); certificates.push_back(certificate); // Spawn the consensus engine and sink the primary channel. @@ -449,34 +449,34 @@ async fn missing_leader() { let nodes: Vec<_> = ids.iter().skip(1).cloned().collect(); let (out, parents) = test_utils::make_optimal_certificates( &committee, + &latest_protocol_version(), 1..=2, &genesis, &nodes, - &latest_protocol_version(), ); certificates.extend(out); // Add back the leader for rounds 3 and 4. let (out, parents) = test_utils::make_optimal_certificates( &committee, + &latest_protocol_version(), 3..=4, &parents, &ids, - &latest_protocol_version(), ); certificates.extend(out); // Add f+1 certificates of round 5 to commit the leader of round 4. let (_, certificate) = test_utils::mock_certificate( &committee, + &latest_protocol_version(), ids[0], 5, parents.clone(), - &latest_protocol_version(), ); certificates.push_back(certificate); let (_, certificate) = - test_utils::mock_certificate(&committee, ids[1], 5, parents, &latest_protocol_version()); + test_utils::mock_certificate(&committee, &latest_protocol_version(), ids[1], 5, parents); certificates.push_back(certificate); // Spawn the consensus engine and sink the primary channel. @@ -558,11 +558,11 @@ async fn committed_round_after_restart() { .collect::>(); let (certificates, _) = test_utils::make_certificates_with_epoch( &committee, + &latest_protocol_version(), 1..=11, epoch, &genesis, &ids, - &latest_protocol_version(), ); let store = make_consensus_store(&test_utils::temp_dir()); @@ -663,11 +663,11 @@ async fn delayed_certificates_are_rejected() { let metrics = Arc::new(ConsensusMetrics::new(&Registry::new())); let (certificates, _) = test_utils::make_certificates_with_epoch( &committee, + &latest_protocol_version(), 1..=5, epoch, &genesis, &ids, - &latest_protocol_version(), ); let store = make_consensus_store(&test_utils::temp_dir()); @@ -715,11 +715,11 @@ async fn submitting_equivocating_certificate_should_error() { let metrics = Arc::new(ConsensusMetrics::new(&Registry::new())); let (certificates, _) = test_utils::make_certificates_with_epoch( &committee, + &latest_protocol_version(), 1..=1, epoch, &genesis, &ids, - &latest_protocol_version(), ); let store = make_consensus_store(&test_utils::temp_dir()); @@ -745,11 +745,11 @@ async fn submitting_equivocating_certificate_should_error() { // them with different epoch as a way to trigger the difference) let (certificates, _) = test_utils::make_certificates_with_epoch( &committee, + &latest_protocol_version(), 1..=1, 100, &genesis, &ids, - &latest_protocol_version(), ); assert_eq!(certificates.len(), 4); @@ -785,11 +785,11 @@ async fn reset_consensus_scores_on_every_schedule_change() { let metrics = Arc::new(ConsensusMetrics::new(&Registry::new())); let (certificates, _) = test_utils::make_certificates_with_epoch( &committee, + &latest_protocol_version(), 1..=50, epoch, &genesis, &ids, - &latest_protocol_version(), ); let store = make_consensus_store(&test_utils::temp_dir()); @@ -889,30 +889,30 @@ async fn restart_with_new_committee() { .collect::>(); let (mut certificates, next_parents) = test_utils::make_certificates_with_epoch( &committee, + &latest_protocol_version(), 1..=2, epoch, &genesis, &ids, - &latest_protocol_version(), ); // Make two certificate (f+1) with round 3 to trigger the commits. let (_, certificate) = test_utils::mock_certificate_with_epoch( &committee, + &latest_protocol_version(), ids[0], 3, epoch, next_parents.clone(), - &latest_protocol_version(), ); certificates.push_back(certificate); let (_, certificate) = test_utils::mock_certificate_with_epoch( &committee, + &latest_protocol_version(), ids[1], 3, epoch, next_parents, - &latest_protocol_version(), ); certificates.push_back(certificate); @@ -973,11 +973,11 @@ async fn garbage_collection_basic() { let slow_nodes = vec![(slow_node, 0.0_f64)]; let (certificates, _round_5_certificates) = test_utils::make_certificates_with_slow_nodes( &committee, + &latest_protocol_version(), 1..=7, genesis, &ids, slow_nodes.as_slice(), - &latest_protocol_version(), ); // Create Bullshark consensus engine @@ -1062,11 +1062,11 @@ async fn slow_node() { let slow_nodes = vec![(slow_node, 0.0_f64)]; let (certificates, round_8_certificates) = test_utils::make_certificates_with_slow_nodes( &committee, + &latest_protocol_version(), 1..=8, genesis, &ids, slow_nodes.as_slice(), - &latest_protocol_version(), ); let mut certificates: VecDeque = certificates; @@ -1122,11 +1122,11 @@ async fn slow_node() { // know the leader of each round. let (certificates, _) = test_utils::make_certificates_with_slow_nodes( &committee, + &latest_protocol_version(), 9..=9, round_8_certificates, &ids, &[], - &latest_protocol_version(), ); // send the certificates - they should trigger a commit. @@ -1195,11 +1195,11 @@ async fn not_enough_support_and_missing_leaders_and_gc() { let (mut certificates, round_2_certificates) = test_utils::make_certificates_with_slow_nodes( &committee, + &latest_protocol_version(), 1..=2, genesis, &keys_with_dead_node, &slow_nodes, - &latest_protocol_version(), ); // on round 3 we'll create certificates that don't provide f+1 support to round 2. @@ -1214,10 +1214,10 @@ async fn not_enough_support_and_missing_leaders_and_gc() { .collect::>(); let (_, certificate) = test_utils::mock_certificate( &committee, + &latest_protocol_version(), *id, 3, parents, - &latest_protocol_version(), ); round_3_certificates.push(certificate); } else { @@ -1229,10 +1229,10 @@ async fn not_enough_support_and_missing_leaders_and_gc() { .collect::>(); let (_, certificate) = test_utils::mock_certificate( &committee, + &latest_protocol_version(), *id, 3, parents, - &latest_protocol_version(), ); round_3_certificates.push(certificate); } @@ -1247,7 +1247,7 @@ async fn not_enough_support_and_missing_leaders_and_gc() { .map(|cert| cert.digest()) .collect::>(); let (_, certificate) = - test_utils::mock_certificate(&committee, *id, 4, parents, &latest_protocol_version()); + test_utils::mock_certificate(&committee, &latest_protocol_version(), *id, 4, parents); round_4_certificates.push(certificate); } @@ -1259,11 +1259,11 @@ async fn not_enough_support_and_missing_leaders_and_gc() { let (certificates_5_to_7, _round_7_certificates) = test_utils::make_certificates_with_slow_nodes( &committee, + &latest_protocol_version(), 5..=7, round_4_certificates.clone(), &ids, &slow_nodes, - &latest_protocol_version(), ); // now send all certificates to Bullshark diff --git a/narwhal/consensus/src/tests/consensus_tests.rs b/narwhal/consensus/src/tests/consensus_tests.rs index 3fbd4ac7abf734..1c3b2f35f9c7b4 100644 --- a/narwhal/consensus/src/tests/consensus_tests.rs +++ b/narwhal/consensus/src/tests/consensus_tests.rs @@ -55,10 +55,10 @@ async fn test_consensus_recovery_with_bullshark() { .collect::>(); let (certificates, _next_parents) = test_utils::make_optimal_certificates( &committee, + &latest_protocol_version(), 1..=7, &genesis, &ids, - &latest_protocol_version(), ); // AND Spawn the consensus engine. diff --git a/narwhal/consensus/src/tests/dag_tests.rs b/narwhal/consensus/src/tests/dag_tests.rs index c26d570478f078..c8c0569abb3658 100644 --- a/narwhal/consensus/src/tests/dag_tests.rs +++ b/narwhal/consensus/src/tests/dag_tests.rs @@ -25,10 +25,10 @@ async fn inner_dag_insert_one() { .collect::>(); let (mut certificates, _next_parents) = make_optimal_certificates( &committee, + &latest_protocol_version(), 1..=4, &genesis, &ids, - &latest_protocol_version(), ); let mut tx_shutdown = PreSubscribedBroadcastSender::new(NUM_SHUTDOWN_RECEIVERS); @@ -55,10 +55,10 @@ async fn test_dag_read_notify() { .collect::>(); let (mut certificates, _next_parents) = make_optimal_certificates( &committee, + &latest_protocol_version(), 1..=4, &genesis, &ids, - &latest_protocol_version(), ); let certs = certificates.clone().into_iter().map(|c| (c.digest(), c)); // set up a Dag @@ -114,10 +114,10 @@ async fn test_dag_new_has_genesis_and_its_not_live() { // But the genesis does not come out in read_causal, as is is compressed the moment we add more nodes let (certificates, _next_parents) = make_optimal_certificates( &committee, + &latest_protocol_version(), 1..=1, &genesis, &ids, - &latest_protocol_version(), ); let mut certs_to_insert = certificates.clone(); @@ -167,10 +167,10 @@ async fn test_dag_compresses_empty_blocks() { // insert one round of empty certificates let (mut certificates, next_parents) = make_optimal_certificates( &committee, + &latest_protocol_version(), 1..=1, &genesis.clone(), &ids, - &latest_protocol_version(), ); // make those empty for cert in certificates.iter_mut() { @@ -191,10 +191,10 @@ async fn test_dag_compresses_empty_blocks() { // Add one round of non-empty certificates let (additional_certificates, _next_parents) = make_optimal_certificates( &committee, + &latest_protocol_version(), 2..=2, &next_parents, &ids, - &latest_protocol_version(), ); // Feed the additional certificates to the Dag let mut additional_certs_to_insert = additional_certificates.clone(); @@ -245,10 +245,10 @@ async fn test_dag_rounds_after_compression() { // insert one round of empty certificates let (mut certificates, next_parents) = make_optimal_certificates( &committee, + &latest_protocol_version(), 1..=1, &genesis.clone(), &ids, - &latest_protocol_version(), ); // make those empty for cert in certificates.iter_mut() { @@ -264,10 +264,10 @@ async fn test_dag_rounds_after_compression() { // Add one round of non-empty certificates let (additional_certificates, _next_parents) = make_optimal_certificates( &committee, + &latest_protocol_version(), 2..=2, &next_parents, &ids, - &latest_protocol_version(), ); // Feed the additional certificates to the Dag let mut additional_certs_to_insert = additional_certificates.clone(); @@ -295,10 +295,10 @@ async fn dag_mutation_failures() { .collect::>(); let (certificates, _next_parents) = make_optimal_certificates( &committee, + &latest_protocol_version(), 1..=4, &genesis, &ids, - &latest_protocol_version(), ); // set up a Dag @@ -370,10 +370,10 @@ async fn dag_insert_one_and_rounds_node_read() { .collect::>(); let (certificates, _next_parents) = make_optimal_certificates( &committee, + &latest_protocol_version(), 1..=4, &genesis, &ids, - &latest_protocol_version(), ); let mut tx_shutdown = PreSubscribedBroadcastSender::new(NUM_SHUTDOWN_RECEIVERS); @@ -423,10 +423,10 @@ async fn dag_insert_and_remove_reads() { .collect::>(); let (mut certificates, _next_parents) = make_optimal_certificates( &committee, + &latest_protocol_version(), 1..=4, &genesis, &ids, - &latest_protocol_version(), ); let mut tx_shutdown = PreSubscribedBroadcastSender::new(NUM_SHUTDOWN_RECEIVERS); diff --git a/narwhal/consensus/src/tests/randomized_tests.rs b/narwhal/consensus/src/tests/randomized_tests.rs index ef7c929ad527d0..e1eb5dd58bdb82 100644 --- a/narwhal/consensus/src/tests/randomized_tests.rs +++ b/narwhal/consensus/src/tests/randomized_tests.rs @@ -416,11 +416,11 @@ pub fn make_certificates_with_parameters( // Now create the certificate with the provided parents let (_, certificate) = mock_certificate_with_rand( committee, + &latest_protocol_version(), authority.id(), round, parents_digests.clone(), &mut rand, - &latest_protocol_version(), ); // group certificates by round for easy access diff --git a/narwhal/consensus/src/tests/tusk_tests.rs b/narwhal/consensus/src/tests/tusk_tests.rs index d3629932af1fba..6877ceb447936b 100644 --- a/narwhal/consensus/src/tests/tusk_tests.rs +++ b/narwhal/consensus/src/tests/tusk_tests.rs @@ -33,10 +33,10 @@ async fn commit_one() { .collect::>(); let (mut certificates, next_parents) = test_utils::make_optimal_certificates( &committee, + &latest_protocol_version(), 1..=4, &genesis, &ids, - &latest_protocol_version(), ); // Make one certificate with round 5 to trigger the commits. @@ -113,10 +113,10 @@ async fn dead_node() { let (mut certificates, _) = test_utils::make_optimal_certificates( &committee, + &latest_protocol_version(), 1..=9, &genesis, &ids, - &latest_protocol_version(), ); // Spawn the consensus engine and sink the primary channel. @@ -195,10 +195,10 @@ async fn not_enough_support() { let nodes: Vec<_> = ids.iter().take(3).cloned().collect(); let (out, parents) = test_utils::make_optimal_certificates( &committee, + &latest_protocol_version(), 1..=1, &genesis, &nodes, - &latest_protocol_version(), ); certificates.extend(out); @@ -216,10 +216,10 @@ async fn not_enough_support() { let nodes: Vec<_> = ids.iter().skip(1).cloned().collect(); let (out, mut parents) = test_utils::make_optimal_certificates( &committee, + &latest_protocol_version(), 2..=2, &parents, &nodes, - &latest_protocol_version(), ); certificates.extend(out); @@ -266,10 +266,10 @@ async fn not_enough_support() { let nodes: Vec<_> = ids.iter().take(3).cloned().collect(); let (out, parents) = test_utils::make_optimal_certificates( &committee, + &latest_protocol_version(), 4..=6, &parents, &nodes, - &latest_protocol_version(), ); certificates.extend(out); @@ -358,20 +358,20 @@ async fn missing_leader() { let nodes: Vec<_> = ids.iter().skip(1).cloned().collect(); let (out, parents) = test_utils::make_optimal_certificates( &committee, + &latest_protocol_version(), 1..=2, &genesis, &nodes, - &latest_protocol_version(), ); certificates.extend(out); // Add back the leader for rounds 3, 4, 5 and 6. let (out, parents) = test_utils::make_optimal_certificates( &committee, + &latest_protocol_version(), 3..=6, &parents, &ids, - &latest_protocol_version(), ); certificates.extend(out); diff --git a/narwhal/executor/src/lib.rs b/narwhal/executor/src/lib.rs index d88914fcd69ab0..10b9aaa01e1215 100644 --- a/narwhal/executor/src/lib.rs +++ b/narwhal/executor/src/lib.rs @@ -51,13 +51,13 @@ impl Executor { authority_id: AuthorityIdentifier, worker_cache: WorkerCache, committee: Committee, + protocol_config: &ProtocolConfig, client: NetworkClient, execution_state: State, shutdown_receivers: Vec, rx_sequence: metered_channel::Receiver, registry: &Registry, restored_consensus_output: Vec, - protocol_config: &ProtocolConfig, ) -> SubscriberResult>> where State: ExecutionState + Send + Sync + 'static, @@ -72,13 +72,13 @@ impl Executor { authority_id, worker_cache, committee, + protocol_config.clone(), client, shutdown_receivers, rx_sequence, arc_metrics, restored_consensus_output, execution_state, - protocol_config.clone(), ); // Return the handle. diff --git a/narwhal/executor/src/metrics.rs b/narwhal/executor/src/metrics.rs index 38cf5c3c98f457..1a992ff800a33f 100644 --- a/narwhal/executor/src/metrics.rs +++ b/narwhal/executor/src/metrics.rs @@ -37,7 +37,7 @@ pub struct ExecutorMetrics { pub batch_execution_latency: Histogram, /// This is similar to batch_execution_latency but without the latency of /// fetching batches from remote workers. - pub batch_execution_latency_without_network_latency: HistogramVec, + pub batch_execution_local_latency: HistogramVec, /// The number of batches per committed subdag to be fetched pub committed_subdag_batch_count: Histogram, /// Latency for time taken to fetch all batches for committed subdag @@ -93,8 +93,8 @@ impl ExecutorMetrics { LATENCY_SEC_BUCKETS.to_vec(), registry ).unwrap(), - batch_execution_latency_without_network_latency: register_histogram_vec_with_registry!( - "batch_execution_latency_without_network_latency", + batch_execution_local_latency: register_histogram_vec_with_registry!( + "batch_execution_local_latency", "This is similar to batch_execution_latency but without the latency of fetching batches from remote workers.", &["source"], LATENCY_SEC_BUCKETS.to_vec(), diff --git a/narwhal/executor/src/subscriber.rs b/narwhal/executor/src/subscriber.rs index 008b2b6091b8ad..edec7399203c2a 100644 --- a/narwhal/executor/src/subscriber.rs +++ b/narwhal/executor/src/subscriber.rs @@ -43,22 +43,22 @@ struct Inner { authority_id: AuthorityIdentifier, worker_cache: WorkerCache, committee: Committee, + protocol_config: ProtocolConfig, client: NetworkClient, metrics: Arc, - protocol_config: ProtocolConfig, } pub fn spawn_subscriber( authority_id: AuthorityIdentifier, worker_cache: WorkerCache, committee: Committee, + protocol_config: ProtocolConfig, client: NetworkClient, mut shutdown_receivers: Vec, rx_sequence: metered_channel::Receiver, metrics: Arc, restored_consensus_output: Vec, state: State, - protocol_config: ProtocolConfig, ) -> Vec> { // This is ugly but has to be done this way for now // Currently network incorporate both server and client side of RPC interface @@ -85,13 +85,13 @@ pub fn spawn_subscriber( authority_id, worker_cache, committee, + protocol_config.clone(), rx_shutdown_subscriber, rx_sequence, client, metrics, restored_consensus_output, tx_notifier, - protocol_config.clone(), ), "SubscriberTask" ), @@ -121,13 +121,13 @@ async fn create_and_run_subscriber( authority_id: AuthorityIdentifier, worker_cache: WorkerCache, committee: Committee, + protocol_config: ProtocolConfig, rx_shutdown: ConditionalBroadcastReceiver, rx_sequence: metered_channel::Receiver, client: NetworkClient, metrics: Arc, restored_consensus_output: Vec, tx_notifier: metered_channel::Sender, - protocol_config: ProtocolConfig, ) { info!("Starting subscriber"); let subscriber = Subscriber { @@ -136,10 +136,10 @@ async fn create_and_run_subscriber( inner: Arc::new(Inner { authority_id, committee, + protocol_config, worker_cache, client, metrics, - protocol_config, }), }; subscriber @@ -390,7 +390,7 @@ impl Subscriber { ); inner .metrics - .batch_execution_latency_without_network_latency + .batch_execution_local_latency .with_label_values(&["other"]) .observe(remote_duration); } else { @@ -405,7 +405,7 @@ impl Subscriber { ); inner .metrics - .batch_execution_latency_without_network_latency + .batch_execution_local_latency .with_label_values(&["own"]) .observe(local_duration); }; diff --git a/narwhal/executor/tests/consensus_integration_tests.rs b/narwhal/executor/tests/consensus_integration_tests.rs index 32aebcb90bf7e9..4d2fd739e311a7 100644 --- a/narwhal/executor/tests/consensus_integration_tests.rs +++ b/narwhal/executor/tests/consensus_integration_tests.rs @@ -40,27 +40,27 @@ async fn test_recovery() { .collect::>(); let (mut certificates, next_parents) = test_utils::make_optimal_certificates( &committee, + &latest_protocol_version(), 1..=2, &genesis, &ids, - &latest_protocol_version(), ); // Make two certificate (f+1) with round 3 to trigger the commits. let (_, certificate) = test_utils::mock_certificate( &committee, + &latest_protocol_version(), ids[0], 3, next_parents.clone(), - &latest_protocol_version(), ); certificates.push_back(certificate); let (_, certificate) = test_utils::mock_certificate( &committee, + &latest_protocol_version(), ids[1], 3, next_parents, - &latest_protocol_version(), ); certificates.push_back(certificate); diff --git a/narwhal/network/src/client.rs b/narwhal/network/src/client.rs index 12a8f43c6fbf8b..08d2ff4d290a5c 100644 --- a/narwhal/network/src/client.rs +++ b/narwhal/network/src/client.rs @@ -12,7 +12,7 @@ use tokio::{select, time::sleep}; use tracing::debug; use types::{ error::LocalClientError, FetchBatchesRequest, FetchBatchesResponse, PrimaryToWorker, - WorkerOthersBatchMessage, WorkerOurBatchMessage, WorkerOurBatchMessageV2, + WorkerOthersBatchMessage, WorkerOurBatchMessage, WorkerOwnBatchMessage, WorkerSynchronizeMessage, WorkerToPrimary, }; @@ -174,6 +174,7 @@ impl PrimaryToWorkerClient for NetworkClient { #[async_trait] impl WorkerToPrimaryClient for NetworkClient { + // TODO: Remove once we have upgraded to protocol version 12. async fn report_our_batch( &self, request: WorkerOurBatchMessage, @@ -189,13 +190,13 @@ impl WorkerToPrimaryClient for NetworkClient { }, } } - async fn report_our_batch_v2( + async fn report_own_batch( &self, - request: WorkerOurBatchMessageV2, + request: WorkerOwnBatchMessage, ) -> Result<(), LocalClientError> { let c = self.get_worker_to_primary_handler().await?; select! { - resp = c.report_our_batch_v2(Request::new(request)) => { + resp = c.report_own_batch(Request::new(request)) => { resp.map_err(|e| LocalClientError::Internal(format!("{e:?}")))?; Ok(()) }, diff --git a/narwhal/network/src/traits.rs b/narwhal/network/src/traits.rs index fcb16fa7d9f7e2..b61e3fef5dd812 100644 --- a/narwhal/network/src/traits.rs +++ b/narwhal/network/src/traits.rs @@ -9,7 +9,7 @@ use types::{ error::LocalClientError, Batch, BatchDigest, FetchBatchesRequest, FetchBatchesResponse, FetchCertificatesRequest, FetchCertificatesResponse, GetCertificatesRequest, GetCertificatesResponse, RequestBatchesRequest, RequestBatchesResponse, - WorkerOthersBatchMessage, WorkerOurBatchMessage, WorkerOurBatchMessageV2, + WorkerOthersBatchMessage, WorkerOurBatchMessage, WorkerOwnBatchMessage, WorkerSynchronizeMessage, }; @@ -98,14 +98,15 @@ pub trait PrimaryToWorkerClient { #[async_trait] pub trait WorkerToPrimaryClient { + // TODO: Remove once we have upgraded to protocol version 12. async fn report_our_batch( &self, request: WorkerOurBatchMessage, ) -> Result<(), LocalClientError>; - async fn report_our_batch_v2( + async fn report_own_batch( &self, - request: WorkerOurBatchMessageV2, + request: WorkerOwnBatchMessage, ) -> Result<(), LocalClientError>; async fn report_others_batch( diff --git a/narwhal/node/src/generate_format.rs b/narwhal/node/src/generate_format.rs index 95ea142b292025..cc45556cd4ff61 100644 --- a/narwhal/node/src/generate_format.rs +++ b/narwhal/node/src/generate_format.rs @@ -14,7 +14,7 @@ use structopt::{clap::arg_enum, StructOpt}; use types::{ Batch, BatchDigest, Certificate, CertificateDigest, Header, HeaderDigest, HeaderV1Builder, Metadata, MetadataV1, VersionedMetadata, WorkerOthersBatchMessage, WorkerOurBatchMessage, - WorkerOurBatchMessageV2, WorkerSynchronizeMessage, + WorkerOwnBatchMessage, WorkerSynchronizeMessage, }; #[allow(clippy::mutable_key_type)] @@ -116,7 +116,7 @@ fn get_registry() -> Result { worker_id: 0, metadata: Metadata { created_at: 0 }, }; - let our_batch_v2 = WorkerOurBatchMessageV2 { + let our_batch_v2 = WorkerOwnBatchMessage { digest: BatchDigest([0u8; 32]), worker_id: 0, metadata: VersionedMetadata::V1(MetadataV1 { diff --git a/narwhal/node/src/main.rs b/narwhal/node/src/main.rs index 7d8de31e8d6114..4bc99044ae93da 100644 --- a/narwhal/node/src/main.rs +++ b/narwhal/node/src/main.rs @@ -282,10 +282,10 @@ async fn run( // Spawn the primary and consensus core. ("primary", Some(sub_matches)) => { let primary = PrimaryNode::new( + ProtocolConfig::get_for_version(ProtocolVersion::max()), parameters.clone(), !sub_matches.is_present("consensus-disabled"), registry_service, - ProtocolConfig::get_for_version(ProtocolVersion::max()), ); primary @@ -313,9 +313,9 @@ async fn run( let worker = WorkerNode::new( id, + ProtocolConfig::get_for_version(ProtocolVersion::max()), parameters.clone(), registry_service, - ProtocolConfig::get_for_version(ProtocolVersion::max()), ); worker diff --git a/narwhal/node/src/primary_node.rs b/narwhal/node/src/primary_node.rs index 4d98f4ef330f33..391c840dd25f82 100644 --- a/narwhal/node/src/primary_node.rs +++ b/narwhal/node/src/primary_node.rs @@ -27,6 +27,7 @@ use tracing::{debug, info, instrument}; use types::{Certificate, ConditionalBroadcastReceiver, PreSubscribedBroadcastSender, Round}; struct PrimaryNodeInner { + protocol_config: ProtocolConfig, // The configuration parameters. parameters: Parameters, // Whether to run consensus (and an executor client) or not. @@ -47,8 +48,6 @@ struct PrimaryNodeInner { tx_shutdown: Option, // Peer ID used for local connections. own_peer_id: Option, - // The protocol configuration. - protocol_config: ProtocolConfig, } impl PrimaryNodeInner { @@ -102,10 +101,10 @@ impl PrimaryNodeInner { worker_cache, client, store, + self.protocol_config.clone(), self.parameters.clone(), self.internal_consensus, execution_state, - self.protocol_config.clone(), ®istry, &mut tx_shutdown, ) @@ -198,6 +197,7 @@ impl PrimaryNodeInner { client: NetworkClient, // The node's storage. store: &NodeStorage, + protocol_config: ProtocolConfig, // The configuration parameters. parameters: Parameters, // Whether to run consensus (and an executor client) or not. @@ -208,8 +208,6 @@ impl PrimaryNodeInner { internal_consensus: bool, // The state used by the client to execute transactions. execution_state: Arc, - // The protocol configuration. - protocol_config: ProtocolConfig, // A prometheus exporter Registry to use for the metrics registry: &Registry, // The channel to send the shutdown signal @@ -267,6 +265,7 @@ impl PrimaryNodeInner { committee.clone(), client.clone(), store, + &protocol_config, parameters.clone(), execution_state, tx_shutdown.subscribe_n(3), @@ -274,7 +273,6 @@ impl PrimaryNodeInner { tx_committed_certificates.clone(), tx_consensus_round_updates, registry, - &protocol_config, ) .await?; @@ -293,6 +291,7 @@ impl PrimaryNodeInner { network_keypair, committee.clone(), worker_cache.clone(), + protocol_config.clone(), parameters.clone(), client, store.header_store.clone(), @@ -307,7 +306,6 @@ impl PrimaryNodeInner { tx_shutdown, tx_committed_certificates, registry, - protocol_config.clone(), ); handles.extend(primary_handles); @@ -321,6 +319,7 @@ impl PrimaryNodeInner { committee: Committee, client: NetworkClient, store: &NodeStorage, + protocol_config: &ProtocolConfig, parameters: Parameters, execution_state: State, mut shutdown_receivers: Vec, @@ -328,7 +327,6 @@ impl PrimaryNodeInner { tx_committed_certificates: metered_channel::Sender<(Round, Vec)>, tx_consensus_round_updates: watch::Sender, registry: &Registry, - protocol_config: &ProtocolConfig, ) -> SubscriberResult>> where PublicKey: VerifyingKey, @@ -385,13 +383,13 @@ impl PrimaryNodeInner { authority_id, worker_cache, committee.clone(), + protocol_config, client, execution_state, shutdown_receivers, rx_sequence, registry, restored_consensus_output, - protocol_config, )?; Ok(executor_handles @@ -408,12 +406,13 @@ pub struct PrimaryNode { impl PrimaryNode { pub fn new( + protocol_config: ProtocolConfig, parameters: Parameters, internal_consensus: bool, registry_service: RegistryService, - protocol_config: ProtocolConfig, ) -> PrimaryNode { let inner = PrimaryNodeInner { + protocol_config, parameters, internal_consensus, registry_service, @@ -422,7 +421,6 @@ impl PrimaryNode { client: None, tx_shutdown: None, own_peer_id: None, - protocol_config, }; Self { diff --git a/narwhal/node/src/worker_node.rs b/narwhal/node/src/worker_node.rs index c58dd28fb8f635..d2db669ef8f640 100644 --- a/narwhal/node/src/worker_node.rs +++ b/narwhal/node/src/worker_node.rs @@ -26,6 +26,7 @@ use worker::{TransactionValidator, Worker, NUM_SHUTDOWN_RECEIVERS}; pub struct WorkerNodeInner { // The worker's id id: WorkerId, + protocol_config: ProtocolConfig, // The configuration parameters. parameters: Parameters, // A prometheus RegistryService to use for the metrics @@ -38,8 +39,6 @@ pub struct WorkerNodeInner { tx_shutdown: Option, // Peer ID used for local connections. own_peer_id: Option, - // The protocol configuration. - protocol_config: ProtocolConfig, } impl WorkerNodeInner { @@ -99,13 +98,13 @@ impl WorkerNodeInner { self.id, committee.clone(), worker_cache.clone(), + self.protocol_config.clone(), self.parameters.clone(), tx_validator.clone(), client.clone(), store.batch_store.clone(), metrics, &mut tx_shutdown, - self.protocol_config.clone(), ); // store the registry @@ -186,19 +185,19 @@ pub struct WorkerNode { impl WorkerNode { pub fn new( id: WorkerId, + protocol_config: ProtocolConfig, parameters: Parameters, registry_service: RegistryService, - protocol_config: ProtocolConfig, ) -> WorkerNode { let inner = WorkerNodeInner { id, + protocol_config, parameters, registry_service, registry: None, handles: FuturesUnordered::new(), tx_shutdown: None, own_peer_id: None, - protocol_config, }; Self { @@ -261,24 +260,24 @@ pub struct WorkerNodes { workers: ArcSwap>, registry_service: RegistryService, registry_id: ArcSwapOption, + protocol_config: ProtocolConfig, parameters: Parameters, client: ArcSwapOption, - protocol_config: ProtocolConfig, } impl WorkerNodes { pub fn new( registry_service: RegistryService, - parameters: Parameters, protocol_config: ProtocolConfig, + parameters: Parameters, ) -> Self { Self { workers: ArcSwap::from(Arc::new(HashMap::default())), registry_service, registry_id: ArcSwapOption::empty(), + protocol_config, parameters, client: ArcSwapOption::empty(), - protocol_config, } } @@ -322,9 +321,9 @@ impl WorkerNodes { for (worker_id, key_pair) in ids_and_keypairs { let worker = WorkerNode::new( worker_id, + self.protocol_config.clone(), self.parameters.clone(), self.registry_service.clone(), - self.protocol_config.clone(), ); worker diff --git a/narwhal/node/tests/node_test.rs b/narwhal/node/tests/node_test.rs index ecaa9c3222bd35..2bf8876a19ab43 100644 --- a/narwhal/node/tests/node_test.rs +++ b/narwhal/node/tests/node_test.rs @@ -44,10 +44,10 @@ async fn simple_primary_worker_node_start_stop() { // WHEN let primary_node = PrimaryNode::new( + latest_protocol_version(), parameters.clone(), true, registry_service.clone(), - latest_protocol_version(), ); primary_node .start( @@ -65,8 +65,8 @@ async fn simple_primary_worker_node_start_stop() { // AND let workers = WorkerNodes::new( registry_service, - parameters.clone(), latest_protocol_version(), + parameters.clone(), ); workers @@ -135,10 +135,10 @@ async fn primary_node_restart() { // AND let primary_node = PrimaryNode::new( + latest_protocol_version(), parameters.clone(), true, registry_service.clone(), - latest_protocol_version(), ); primary_node .start( diff --git a/narwhal/node/tests/staged/narwhal.yaml b/narwhal/node/tests/staged/narwhal.yaml index ccd685dd601f6a..bef65fc37750a0 100644 --- a/narwhal/node/tests/staged/narwhal.yaml +++ b/narwhal/node/tests/staged/narwhal.yaml @@ -121,7 +121,7 @@ WorkerOurBatchMessage: - worker_id: U32 - metadata: TYPENAME: Metadata -WorkerOurBatchMessageV2: +WorkerOwnBatchMessage: STRUCT: - digest: TYPENAME: BatchDigest @@ -136,4 +136,3 @@ WorkerSynchronizeMessage: - target: TYPENAME: AuthorityIdentifier - is_certified: BOOL - diff --git a/narwhal/primary/src/primary.rs b/narwhal/primary/src/primary.rs index 6c1a4778dc311a..558428c8f5b868 100644 --- a/narwhal/primary/src/primary.rs +++ b/narwhal/primary/src/primary.rs @@ -72,7 +72,7 @@ use types::{ MetadataAPI, PayloadAvailabilityRequest, PayloadAvailabilityResponse, PreSubscribedBroadcastSender, PrimaryToPrimary, PrimaryToPrimaryServer, RequestVoteRequest, RequestVoteResponse, Round, SendCertificateRequest, SendCertificateResponse, Vote, VoteInfoAPI, - WorkerOthersBatchMessage, WorkerOurBatchMessage, WorkerOurBatchMessageV2, WorkerToPrimary, + WorkerOthersBatchMessage, WorkerOurBatchMessage, WorkerOwnBatchMessage, WorkerToPrimary, WorkerToPrimaryServer, }; @@ -100,6 +100,7 @@ impl Primary { network_signer: NetworkKeyPair, committee: Committee, worker_cache: WorkerCache, + _protocol_config: ProtocolConfig, parameters: Parameters, client: NetworkClient, header_store: HeaderStore, @@ -114,7 +115,6 @@ impl Primary { tx_shutdown: &mut PreSubscribedBroadcastSender, tx_committed_certificates: Sender<(Round, Vec)>, registry: &Registry, - _protocol_config: ProtocolConfig, ) -> Vec> { // Write the parameters to the logs. parameters.tracing(); @@ -1184,6 +1184,7 @@ struct WorkerReceiverHandler { #[async_trait] impl WorkerToPrimary for WorkerReceiverHandler { + // TODO: Remove once we have upgraded to protocol version 12. async fn report_our_batch( &self, request: anemo::Request, @@ -1211,9 +1212,9 @@ impl WorkerToPrimary for WorkerReceiverHandler { Ok(response) } - async fn report_our_batch_v2( + async fn report_own_batch( &self, - request: anemo::Request, + request: anemo::Request, ) -> Result, anemo::rpc::Status> { let message = request.into_body(); diff --git a/narwhal/primary/src/tests/primary_tests.rs b/narwhal/primary/src/tests/primary_tests.rs index 8d94d3d4a92d93..d80a44d77ab7dc 100644 --- a/narwhal/primary/src/tests/primary_tests.rs +++ b/narwhal/primary/src/tests/primary_tests.rs @@ -93,6 +93,7 @@ async fn get_network_peers_from_admin_server() { authority_1.network_keypair().copy(), committee.clone(), worker_cache.clone(), + test_utils::latest_protocol_version(), primary_1_parameters.clone(), client_1.clone(), store.header_store.clone(), @@ -116,7 +117,6 @@ async fn get_network_peers_from_admin_server() { &mut tx_shutdown, tx_feedback, &Registry::new(), - test_utils::latest_protocol_version(), ); // Wait for tasks to start @@ -139,13 +139,13 @@ async fn get_network_peers_from_admin_server() { worker_id, committee.clone(), worker_cache.clone(), + test_utils::latest_protocol_version(), worker_1_parameters.clone(), TrivialTransactionValidator::default(), client_1, store.batch_store, metrics_1, &mut tx_shutdown_worker, - test_utils::latest_protocol_version(), ); // Test getting all known peers for primary 1 @@ -218,6 +218,7 @@ async fn get_network_peers_from_admin_server() { authority_2.network_keypair().copy(), committee.clone(), worker_cache.clone(), + test_utils::latest_protocol_version(), primary_2_parameters.clone(), client_2.clone(), store.header_store.clone(), @@ -241,7 +242,6 @@ async fn get_network_peers_from_admin_server() { &mut tx_shutdown_2, tx_feedback_2, &Registry::new(), - test_utils::latest_protocol_version(), ); // Wait for tasks to start @@ -364,8 +364,8 @@ async fn test_request_vote_has_missing_parents() { 1..=3, &genesis, &committee, - ids.as_slice(), &test_utils::latest_protocol_version(), + ids.as_slice(), ); let all_certificates = certificates.into_iter().collect_vec(); let round_2_certs = all_certificates[NUM_PARENTS..(NUM_PARENTS * 2)].to_vec(); @@ -535,8 +535,8 @@ async fn test_request_vote_accept_missing_parents() { 1..=3, &genesis, &committee, - ids.as_slice(), &test_utils::latest_protocol_version(), + ids.as_slice(), ); let all_certificates = certificates.into_iter().collect_vec(); let round_1_certs = all_certificates[..NUM_PARENTS].to_vec(); diff --git a/narwhal/primary/src/tests/synchronizer_tests.rs b/narwhal/primary/src/tests/synchronizer_tests.rs index b6766c42d6855d..7e7f78c7ee2529 100644 --- a/narwhal/primary/src/tests/synchronizer_tests.rs +++ b/narwhal/primary/src/tests/synchronizer_tests.rs @@ -180,8 +180,8 @@ async fn accept_suspended_certificates() { 1..=5, &genesis, &committee, - keys.as_slice(), &latest_protocol_version(), + keys.as_slice(), ); let certificates = certificates.into_iter().collect_vec(); @@ -532,8 +532,8 @@ async fn synchronizer_recover_previous_round() { 1..=2, &genesis, &committee, - &keys, &latest_protocol_version(), + &keys, ); let all_certificates: Vec<_> = all_certificates.into_iter().collect(); let round_1_certificates = all_certificates[0..3].to_vec(); @@ -652,8 +652,8 @@ async fn deliver_certificate_using_dag() { 1..=4, &genesis, &committee, - &keys, &latest_protocol_version(), + &keys, ); // insert the certificates in the DAG @@ -724,8 +724,8 @@ async fn deliver_certificate_using_store() { 1..=4, &genesis, &committee, - &keys, &latest_protocol_version(), + &keys, ); // insert the certificates in the DAG @@ -796,8 +796,8 @@ async fn deliver_certificate_not_found_parents() { 1..=4, &genesis, &committee, - &keys, &latest_protocol_version(), + &keys, ); // take the last one (top) and test for parents @@ -972,8 +972,8 @@ async fn gc_suspended_certificates() { 1..=5, &genesis, &committee, - keys.as_slice(), &latest_protocol_version(), + keys.as_slice(), ); let certificates = certificates.into_iter().collect_vec(); diff --git a/narwhal/primary/tests/integration_tests_proposer_api.rs b/narwhal/primary/tests/integration_tests_proposer_api.rs index ff5b49762a89f5..8569321c456d9d 100644 --- a/narwhal/primary/tests/integration_tests_proposer_api.rs +++ b/narwhal/primary/tests/integration_tests_proposer_api.rs @@ -114,6 +114,7 @@ async fn test_rounds_errors() { network_keypair, committee.clone(), worker_cache, + latest_protocol_version(), parameters.clone(), client, store_primary.header_store, @@ -137,7 +138,6 @@ async fn test_rounds_errors() { &mut tx_shutdown, tx_feedback, &Registry::new(), - latest_protocol_version(), ); // AND Wait for tasks to start @@ -217,6 +217,7 @@ async fn test_rounds_return_successful_response() { author.network_keypair().copy(), committee.clone(), worker_cache, + latest_protocol_version(), parameters.clone(), client, store_primary.header_store, @@ -231,7 +232,6 @@ async fn test_rounds_return_successful_response() { &mut tx_shutdown, tx_feedback, &Registry::new(), - latest_protocol_version(), ); // AND Wait for tasks to start @@ -246,13 +246,13 @@ async fn test_rounds_return_successful_response() { .collect::>(); let (mut certificates, _next_parents) = make_optimal_certificates( &committee, + &latest_protocol_version(), 1..=4, &genesis, &committee .authorities() .map(|authority| authority.id()) .collect::>(), - &latest_protocol_version(), ); // Feed the certificates to the Dag @@ -339,8 +339,8 @@ async fn test_node_read_causal_signed_certificates() { 1..=4, &genesis, &committee, - &keys, &latest_protocol_version(), + &keys, ); collection_ids.extend( @@ -387,6 +387,7 @@ async fn test_node_read_causal_signed_certificates() { authority_1.network_keypair().copy(), committee.clone(), worker_cache.clone(), + latest_protocol_version(), primary_1_parameters.clone(), client_1, primary_store_1.header_store.clone(), @@ -401,7 +402,6 @@ async fn test_node_read_causal_signed_certificates() { &mut tx_shutdown, tx_feedback, &Registry::new(), - latest_protocol_version(), ); let (tx_new_certificates_2, rx_new_certificates_2) = @@ -426,6 +426,7 @@ async fn test_node_read_causal_signed_certificates() { authority_2.network_keypair().copy(), committee.clone(), worker_cache.clone(), + latest_protocol_version(), primary_2_parameters.clone(), client_2, primary_store_2.header_store, @@ -449,7 +450,6 @@ async fn test_node_read_causal_signed_certificates() { &mut tx_shutdown_2, tx_feedback_2, &Registry::new(), - latest_protocol_version(), ); // Wait for tasks to start diff --git a/narwhal/primary/tests/integration_tests_validator_api.rs b/narwhal/primary/tests/integration_tests_validator_api.rs index 093e2ae17ee8b4..ba26e62df3f2ad 100644 --- a/narwhal/primary/tests/integration_tests_validator_api.rs +++ b/narwhal/primary/tests/integration_tests_validator_api.rs @@ -111,6 +111,7 @@ async fn test_get_collections() { author.network_keypair().copy(), committee.clone(), worker_cache.clone(), + latest_protocol_version(), parameters.clone(), client.clone(), store.header_store.clone(), @@ -134,7 +135,6 @@ async fn test_get_collections() { &mut tx_shutdown, tx_feedback, &Registry::new(), - latest_protocol_version(), ); let registry = Registry::new(); @@ -148,13 +148,13 @@ async fn test_get_collections() { worker_id, committee.clone(), worker_cache.clone(), + latest_protocol_version(), parameters.clone(), TrivialTransactionValidator::default(), client, store.batch_store.clone(), metrics, &mut tx_shutdown_worker, - latest_protocol_version(), ); // Wait for tasks to start @@ -321,6 +321,7 @@ async fn test_remove_collections() { author.network_keypair().copy(), committee.clone(), worker_cache.clone(), + latest_protocol_version(), parameters.clone(), network_client.clone(), store.header_store.clone(), @@ -335,7 +336,6 @@ async fn test_remove_collections() { &mut tx_shutdown, tx_feedback, &Registry::new(), - latest_protocol_version(), ); // Wait for tasks to start @@ -380,13 +380,13 @@ async fn test_remove_collections() { worker_id, committee.clone(), worker_cache.clone(), + latest_protocol_version(), parameters.clone(), TrivialTransactionValidator::default(), network_client, store.batch_store.clone(), metrics, &mut tx_shutdown_worker, - latest_protocol_version(), ); // Test remove no collections @@ -533,8 +533,8 @@ async fn test_read_causal_signed_certificates() { 1..=4, &genesis, &committee, - &keys, &latest_protocol_version(), + &keys, ); collection_digests.extend( @@ -580,6 +580,7 @@ async fn test_read_causal_signed_certificates() { authority_1.network_keypair().copy(), committee.clone(), worker_cache.clone(), + latest_protocol_version(), primary_1_parameters.clone(), client_1, primary_store_1.header_store.clone(), @@ -594,7 +595,6 @@ async fn test_read_causal_signed_certificates() { &mut tx_shutdown, tx_feedback, &Registry::new(), - latest_protocol_version(), ); let (tx_new_certificates_2, rx_new_certificates_2) = @@ -620,6 +620,7 @@ async fn test_read_causal_signed_certificates() { authority_2.network_keypair().copy(), committee.clone(), worker_cache.clone(), + latest_protocol_version(), primary_2_parameters.clone(), client_2, primary_store_2.header_store, @@ -643,7 +644,6 @@ async fn test_read_causal_signed_certificates() { &mut tx_shutdown_2, tx_feedback_2, &Registry::new(), - latest_protocol_version(), ); // Wait for tasks to start @@ -763,13 +763,13 @@ async fn test_read_causal_unsigned_certificates() { let (certificates, _next_parents) = make_optimal_certificates( &committee, + &latest_protocol_version(), 1..=4, &genesis, &committee .authorities() .map(|authority| authority.id()) .collect::>(), - &latest_protocol_version(), ); collection_digests.extend( @@ -809,6 +809,7 @@ async fn test_read_causal_unsigned_certificates() { authority_1.network_keypair().copy(), committee.clone(), worker_cache.clone(), + latest_protocol_version(), primary_1_parameters.clone(), client_1, primary_store_1.header_store.clone(), @@ -823,7 +824,6 @@ async fn test_read_causal_unsigned_certificates() { &mut tx_shutdown, tx_feedback, &Registry::new(), - latest_protocol_version(), ); let (tx_new_certificates_2, rx_new_certificates_2) = @@ -843,6 +843,7 @@ async fn test_read_causal_unsigned_certificates() { network_keypair_2, committee.clone(), worker_cache.clone(), + latest_protocol_version(), primary_2_parameters.clone(), client_2, primary_store_2.header_store, @@ -866,7 +867,6 @@ async fn test_read_causal_unsigned_certificates() { &mut tx_shutdown_2, tx_feedback_2, &Registry::new(), - latest_protocol_version(), ); // Wait for tasks to start @@ -1023,6 +1023,7 @@ async fn test_get_collections_with_missing_certificates() { authority_1.network_keypair().copy(), committee.clone(), worker_cache.clone(), + latest_protocol_version(), parameters_1.clone(), client_1.clone(), store_primary_1.header_store, @@ -1046,7 +1047,6 @@ async fn test_get_collections_with_missing_certificates() { &mut tx_shutdown, tx_feedback_1, &Registry::new(), - latest_protocol_version(), ); let registry_1 = Registry::new(); @@ -1060,13 +1060,13 @@ async fn test_get_collections_with_missing_certificates() { worker_id, committee.clone(), worker_cache.clone(), + latest_protocol_version(), parameters_1.clone(), TrivialTransactionValidator::default(), client_1, store_primary_1.batch_store, metrics_1, &mut tx_shutdown_worker_1, - latest_protocol_version(), ); // Spawn the primary 2 - a peer to fetch missing certificates from @@ -1096,6 +1096,7 @@ async fn test_get_collections_with_missing_certificates() { authority_2.network_keypair().copy(), committee.clone(), worker_cache.clone(), + latest_protocol_version(), parameters_2.clone(), client_2.clone(), store_primary_2.header_store, @@ -1111,7 +1112,6 @@ async fn test_get_collections_with_missing_certificates() { &mut tx_shutdown_2, tx_feedback_2, &Registry::new(), - latest_protocol_version(), ); let registry_2 = Registry::new(); @@ -1125,13 +1125,13 @@ async fn test_get_collections_with_missing_certificates() { worker_id, committee.clone(), worker_cache.clone(), + latest_protocol_version(), parameters_2.clone(), TrivialTransactionValidator::default(), client_2, store_primary_2.batch_store, metrics_2, &mut tx_shutdown_worker_2, - latest_protocol_version(), ); // Wait for tasks to start diff --git a/narwhal/test-utils/src/cluster.rs b/narwhal/test-utils/src/cluster.rs index 83a2d88c60e560..4035af40213af5 100644 --- a/narwhal/test-utils/src/cluster.rs +++ b/narwhal/test-utils/src/cluster.rs @@ -1,6 +1,6 @@ // Copyright (c) Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 -use crate::{temp_dir, CommitteeFixture}; +use crate::{latest_protocol_version, temp_dir, CommitteeFixture}; use config::{AuthorityIdentifier, Committee, Parameters, WorkerCache, WorkerId}; use crypto::{KeyPair, NetworkKeyPair, PublicKey}; use executor::SerializedTransaction; @@ -15,7 +15,6 @@ use node::{execution_state::SimpleExecutionState, metrics::worker_metrics_regist use prometheus::{proto::Metric, Registry}; use std::{cell::RefCell, collections::HashMap, path::PathBuf, rc::Rc, sync::Arc, time::Duration}; use storage::NodeStorage; -use sui_protocol_config::{ProtocolConfig, ProtocolVersion}; use telemetry_subscribers::TelemetryGuards; use tokio::{ sync::{broadcast::Sender, mpsc::channel, RwLock}, @@ -305,10 +304,10 @@ impl PrimaryNodeDetails { let registry_service = RegistryService::new(Registry::new()); let node = PrimaryNode::new( + latest_protocol_version(), parameters.clone(), internal_consensus_enabled, registry_service, - ProtocolConfig::get_for_version(ProtocolVersion::max()), ); Self { @@ -431,12 +430,7 @@ impl WorkerNodeDetails { worker_cache: WorkerCache, ) -> Self { let registry_service = RegistryService::new(Registry::new()); - let node = WorkerNode::new( - id, - parameters, - registry_service, - ProtocolConfig::get_for_version(ProtocolVersion::max()), - ); + let node = WorkerNode::new(id, latest_protocol_version(), parameters, registry_service); Self { id, diff --git a/narwhal/test-utils/src/lib.rs b/narwhal/test-utils/src/lib.rs index 6030cf707440a7..1b29f3f44b08ed 100644 --- a/narwhal/test-utils/src/lib.rs +++ b/narwhal/test-utils/src/lib.rs @@ -441,12 +441,12 @@ pub fn create_batch_store() -> DBMap { // Note : the certificates are unsigned pub fn make_optimal_certificates( committee: &Committee, + protocol_config: &ProtocolConfig, range: RangeInclusive, initial_parents: &BTreeSet, ids: &[AuthorityIdentifier], - protocol_config: &ProtocolConfig, ) -> (VecDeque, BTreeSet) { - make_certificates(committee, range, initial_parents, ids, 0.0, protocol_config) + make_certificates(committee, protocol_config, range, initial_parents, ids, 0.0) } // Outputs rounds worth of certificates with optimal parents, signed @@ -454,16 +454,16 @@ pub fn make_optimal_signed_certificates( range: RangeInclusive, initial_parents: &BTreeSet, committee: &Committee, - keys: &[(AuthorityIdentifier, KeyPair)], protocol_config: &ProtocolConfig, + keys: &[(AuthorityIdentifier, KeyPair)], ) -> (VecDeque, BTreeSet) { make_signed_certificates( range, initial_parents, committee, + protocol_config, keys, 0.0, - protocol_config, ) } @@ -516,14 +516,14 @@ fn rounds_of_certificates( // make rounds worth of unsigned certificates with the sampled number of parents pub fn make_certificates( committee: &Committee, + protocol_config: &ProtocolConfig, range: RangeInclusive, initial_parents: &BTreeSet, ids: &[AuthorityIdentifier], failure_probability: f64, - protocol_config: &ProtocolConfig, ) -> (VecDeque, BTreeSet) { let generator = - |pk, round, parents| mock_certificate(committee, pk, round, parents, protocol_config); + |pk, round, parents| mock_certificate(committee, protocol_config, pk, round, parents); rounds_of_certificates(range, initial_parents, ids, failure_probability, generator) } @@ -538,11 +538,11 @@ pub fn make_certificates( // produced. pub fn make_certificates_with_slow_nodes( committee: &Committee, + protocol_config: &ProtocolConfig, range: RangeInclusive, initial_parents: Vec, names: &[AuthorityIdentifier], slow_nodes: &[(AuthorityIdentifier, f64)], - protocol_config: &ProtocolConfig, ) -> (VecDeque, Vec) { let mut rand = StdRng::seed_from_u64(1); @@ -570,7 +570,7 @@ pub fn make_certificates_with_slow_nodes( ); let (_, certificate) = - mock_certificate(committee, *name, round, this_cert_parents, protocol_config); + mock_certificate(committee, protocol_config, *name, round, this_cert_parents); certificates.push_back(certificate.clone()); next_parents.push(certificate); } @@ -645,11 +645,11 @@ pub fn this_cert_parents_with_slow_nodes( // make rounds worth of unsigned certificates with the sampled number of parents pub fn make_certificates_with_epoch( committee: &Committee, + protocol_config: &ProtocolConfig, range: RangeInclusive, epoch: Epoch, initial_parents: &BTreeSet, keys: &[AuthorityIdentifier], - protocol_config: &ProtocolConfig, ) -> (VecDeque, BTreeSet) { let mut certificates = VecDeque::new(); let mut parents = initial_parents.iter().cloned().collect::>(); @@ -660,11 +660,11 @@ pub fn make_certificates_with_epoch( for name in keys { let (digest, certificate) = mock_certificate_with_epoch( committee, + protocol_config, *name, round, epoch, parents.clone(), - protocol_config, ); certificates.push_back(certificate); next_parents.insert(digest); @@ -679,9 +679,9 @@ pub fn make_signed_certificates( range: RangeInclusive, initial_parents: &BTreeSet, committee: &Committee, + protocol_config: &ProtocolConfig, keys: &[(AuthorityIdentifier, KeyPair)], failure_probability: f64, - protocol_config: &ProtocolConfig, ) -> (VecDeque, BTreeSet) { let ids = keys .iter() @@ -702,11 +702,11 @@ pub fn make_signed_certificates( pub fn mock_certificate_with_rand( committee: &Committee, + protocol_config: &ProtocolConfig, origin: AuthorityIdentifier, round: Round, parents: BTreeSet, rand: &mut R, - protocol_config: &ProtocolConfig, ) -> (CertificateDigest, Certificate) { let header_builder = HeaderV1Builder::default(); let header = header_builder @@ -730,23 +730,23 @@ pub fn mock_certificate_with_rand( // Note: the certificate is signed by a random key rather than its author pub fn mock_certificate( committee: &Committee, + protocol_config: &ProtocolConfig, origin: AuthorityIdentifier, round: Round, parents: BTreeSet, - protocol_config: &ProtocolConfig, ) -> (CertificateDigest, Certificate) { - mock_certificate_with_epoch(committee, origin, round, 0, parents, protocol_config) + mock_certificate_with_epoch(committee, protocol_config, origin, round, 0, parents) } // Creates a badly signed certificate from its given round, epoch, origin, and parents, // Note: the certificate is signed by a random key rather than its author pub fn mock_certificate_with_epoch( committee: &Committee, + protocol_config: &ProtocolConfig, origin: AuthorityIdentifier, round: Round, epoch: Epoch, parents: BTreeSet, - protocol_config: &ProtocolConfig, ) -> (CertificateDigest, Certificate) { let header_builder = HeaderV1Builder::default(); let header = header_builder diff --git a/narwhal/types/benches/verify_certificate.rs b/narwhal/types/benches/verify_certificate.rs index 2f162344f4270d..63115c0dbaf0ec 100644 --- a/narwhal/types/benches/verify_certificate.rs +++ b/narwhal/types/benches/verify_certificate.rs @@ -28,10 +28,10 @@ pub fn verify_certificates(c: &mut Criterion) { .collect::>(); let (certificates, _next_parents) = make_optimal_certificates( &committee, + &latest_protocol_version(), 1..=1, &genesis, &ids, - &latest_protocol_version(), ); let certificate = certificates.front().unwrap().clone(); diff --git a/narwhal/types/build.rs b/narwhal/types/build.rs index 42d42670ea7ff3..ddc88e4fee8e77 100644 --- a/narwhal/types/build.rs +++ b/narwhal/types/build.rs @@ -136,6 +136,7 @@ fn build_anemo_services(out_dir: &Path) { .name("WorkerToPrimary") .package("narwhal") .attributes(automock_attribute.clone()) + // TODO: Remove once we have upgraded to protocol version 12. .method( anemo_build::manual::Method::builder() .name("report_our_batch") @@ -147,9 +148,9 @@ fn build_anemo_services(out_dir: &Path) { ) .method( anemo_build::manual::Method::builder() - .name("report_our_batch_v2") - .route_name("ReportOurBatchV2") - .request_type("crate::WorkerOurBatchMessageV2") + .name("report_own_batch") + .route_name("ReportOwnBatch") + .request_type("crate::WorkerOwnBatchMessage") .response_type("()") .codec_path(codec_path) .build(), diff --git a/narwhal/types/src/primary.rs b/narwhal/types/src/primary.rs index 0fc9c02f1e8bf5..f331e7a3db83e1 100644 --- a/narwhal/types/src/primary.rs +++ b/narwhal/types/src/primary.rs @@ -116,9 +116,9 @@ pub struct MetadataV1 { // timestamp of when the entity created. This is generated // by the node which creates the entity. pub created_at: TimestampMs, - // timestamp of when the entity was received by the node. This will help + // timestamp of when the entity was received by an other node. This will help // us calculate latencies that are not affected by clock drift or network - // delays. + // delays. This field is not set for own batches. pub received_at: Option, } @@ -245,9 +245,10 @@ impl BatchV1 { #[derive(Clone, Serialize, Deserialize, Debug, PartialEq, Eq, Arbitrary)] pub struct BatchV2 { + pub epoch: Epoch, pub transactions: Vec, + // This field is not included as part of the batch digest pub versioned_metadata: VersionedMetadata, - pub epoch: Epoch, } impl BatchAPI for BatchV2 { @@ -1528,6 +1529,7 @@ impl fmt::Display for BlockErrorKind { } } +// TODO: Remove once we have upgraded to protocol version 12. /// Used by worker to inform primary it sealed a new batch. #[derive(Clone, Serialize, Deserialize, Eq, PartialEq, Debug)] pub struct WorkerOurBatchMessage { @@ -1538,7 +1540,7 @@ pub struct WorkerOurBatchMessage { /// Used by worker to inform primary it sealed a new batch. #[derive(Clone, Serialize, Deserialize, Eq, PartialEq, Debug)] -pub struct WorkerOurBatchMessageV2 { +pub struct WorkerOwnBatchMessage { pub digest: BatchDigest, pub worker_id: WorkerId, pub metadata: VersionedMetadata, diff --git a/narwhal/worker/src/batch_fetcher.rs b/narwhal/worker/src/batch_fetcher.rs index 2d2275aa5e8430..c419c61e1e8bca 100644 --- a/narwhal/worker/src/batch_fetcher.rs +++ b/narwhal/worker/src/batch_fetcher.rs @@ -32,6 +32,7 @@ use types::{ use crate::metrics::WorkerMetrics; const REMOTE_PARALLEL_FETCH_INTERVAL: Duration = Duration::from_secs(2); +const WORKER_RETRY_INTERVAL: Duration = Duration::from_secs(1); pub struct BatchFetcher { name: NetworkPublicKey, @@ -149,7 +150,8 @@ impl BatchFetcher { } // After all known remote workers have been tried, restart the outer loop to fetch - // from local storage then remain workers again. + // from local storage then remote workers again. + sleep(WORKER_RETRY_INTERVAL).await; } } @@ -350,6 +352,7 @@ mod tests { use rand::rngs::StdRng; use std::collections::HashMap; use test_utils::{get_protocol_config, latest_protocol_version}; + use tokio::time::timeout; // TODO: Remove once we have upgraded to protocol version 12. // Case #1: Receive BatchV1 and network has not upgraded to 12 so we are okay @@ -401,66 +404,64 @@ mod tests { // Case #2: Receive BatchV1 but network has upgraded to 12 so we fail because we expect BatchV2 #[tokio::test] pub async fn test_fetcher_with_batch_v1_and_network_v12() { - // TODO: Enable once I figure out why the fetch call is not timing out. - // telemetry_subscribers::init_for_testing(); - // let mut network = TestRequestBatchesNetwork::new(); - // let batch_store = test_utils::create_batch_store(); - // let latest_protocol_config = latest_protocol_version(); - // let v11_protocol_config = &get_protocol_config(11); - // let batchv1_1 = Batch::new(vec![vec![1]], v11_protocol_config, 0); - // let batchv1_2 = Batch::new(vec![vec![2]], v11_protocol_config, 0); - // let (digests, known_workers) = ( - // HashSet::from_iter(vec![batchv1_1.digest(), batchv1_2.digest()]), - // HashSet::from_iter(test_pks(&[1, 2])), - // ); - // network.put(&[1, 2], batchv1_1.clone()); - // network.put(&[2, 3], batchv1_2.clone()); - // let fetcher = BatchFetcher { - // name: test_pk(0), - // network: Arc::new(network.clone()), - // batch_store: batch_store.clone(), - // metrics: Arc::new(WorkerMetrics::default()), - // protocol_config: latest_protocol_config, - // }; - // let fetch_result = timeout( - // Duration::from_secs(5), - // fetcher.fetch(digests, known_workers), - // ) - // .await; - // assert!(fetch_result.is_err()); + telemetry_subscribers::init_for_testing(); + let mut network = TestRequestBatchesNetwork::new(); + let batch_store = test_utils::create_batch_store(); + let latest_protocol_config = latest_protocol_version(); + let v11_protocol_config = &get_protocol_config(11); + let batchv1_1 = Batch::new(vec![vec![1]], v11_protocol_config, 0); + let batchv1_2 = Batch::new(vec![vec![2]], v11_protocol_config, 0); + let (digests, known_workers) = ( + HashSet::from_iter(vec![batchv1_1.digest(), batchv1_2.digest()]), + HashSet::from_iter(test_pks(&[1, 2])), + ); + network.put(&[1, 2], batchv1_1.clone()); + network.put(&[2, 3], batchv1_2.clone()); + let fetcher = BatchFetcher { + name: test_pk(0), + network: Arc::new(network.clone()), + batch_store: batch_store.clone(), + metrics: Arc::new(WorkerMetrics::default()), + protocol_config: latest_protocol_config, + }; + let fetch_result = timeout( + Duration::from_secs(1), + fetcher.fetch(digests, known_workers), + ) + .await; + assert!(fetch_result.is_err()); } // TODO: Remove once we have upgraded to protocol version 12. // Case #3: Receive BatchV2 but network is still in v11 so we fail because we expect BatchV1 #[tokio::test] pub async fn test_fetcher_with_batch_v2_and_network_v11() { - // TODO: Enable once I figure out why the fetch call is not timing out. - // telemetry_subscribers::init_for_testing(); - // let mut network = TestRequestBatchesNetwork::new(); - // let batch_store = test_utils::create_batch_store(); - // let latest_protocol_config = &latest_protocol_version(); - // let v11_protocol_config = get_protocol_config(11); - // let batchv2_1 = Batch::new(vec![vec![1]], latest_protocol_config, 0); - // let batchv2_2 = Batch::new(vec![vec![2]], latest_protocol_config, 0); - // let (digests, known_workers) = ( - // HashSet::from_iter(vec![batchv2_1.digest(), batchv2_2.digest()]), - // HashSet::from_iter(test_pks(&[1, 2])), - // ); - // network.put(&[1, 2], batchv2_1.clone()); - // network.put(&[2, 3], batchv2_2.clone()); - // let fetcher = BatchFetcher { - // name: test_pk(0), - // network: Arc::new(network.clone()), - // batch_store: batch_store.clone(), - // metrics: Arc::new(WorkerMetrics::default()), - // protocol_config: v11_protocol_config, - // }; - // let fetch_result = timeout( - // Duration::from_secs(5), - // fetcher.fetch(digests, known_workers), - // ) - // .await; - // assert!(fetch_result.is_err()); + telemetry_subscribers::init_for_testing(); + let mut network = TestRequestBatchesNetwork::new(); + let batch_store = test_utils::create_batch_store(); + let latest_protocol_config = &latest_protocol_version(); + let v11_protocol_config = get_protocol_config(11); + let batchv2_1 = Batch::new(vec![vec![1]], latest_protocol_config, 0); + let batchv2_2 = Batch::new(vec![vec![2]], latest_protocol_config, 0); + let (digests, known_workers) = ( + HashSet::from_iter(vec![batchv2_1.digest(), batchv2_2.digest()]), + HashSet::from_iter(test_pks(&[1, 2])), + ); + network.put(&[1, 2], batchv2_1.clone()); + network.put(&[2, 3], batchv2_2.clone()); + let fetcher = BatchFetcher { + name: test_pk(0), + network: Arc::new(network.clone()), + batch_store: batch_store.clone(), + metrics: Arc::new(WorkerMetrics::default()), + protocol_config: v11_protocol_config, + }; + let fetch_result = timeout( + Duration::from_secs(1), + fetcher.fetch(digests, known_workers), + ) + .await; + assert!(fetch_result.is_err()); } // TODO: Remove once we have upgraded to protocol version 12. diff --git a/narwhal/worker/src/batch_maker.rs b/narwhal/worker/src/batch_maker.rs index 9a419549a675e4..dc70be40daf4ff 100644 --- a/narwhal/worker/src/batch_maker.rs +++ b/narwhal/worker/src/batch_maker.rs @@ -21,7 +21,7 @@ use tokio::{ use tracing::{error, warn}; use types::{ error::DagError, now, Batch, BatchAPI, BatchDigest, ConditionalBroadcastReceiver, MetadataAPI, - Transaction, TxResponse, WorkerOurBatchMessage, WorkerOurBatchMessageV2, + Transaction, TxResponse, WorkerOurBatchMessage, WorkerOwnBatchMessage, }; #[cfg(feature = "trace_transaction")] @@ -296,12 +296,12 @@ impl BatchMaker { let _ = done_sending.await; // Send the batch to the primary. - let message = WorkerOurBatchMessageV2 { + let message = WorkerOwnBatchMessage { digest, worker_id, metadata, }; - if let Err(e) = client.report_our_batch_v2(message).await { + if let Err(e) = client.report_own_batch(message).await { warn!("Failed to report our batch: {}", e); // Drop all response handers to signal error, since we // cannot ensure the primary has actually signaled the diff --git a/narwhal/worker/src/handlers.rs b/narwhal/worker/src/handlers.rs index be2ef8ec54649d..dd07fe2959ac81 100644 --- a/narwhal/worker/src/handlers.rs +++ b/narwhal/worker/src/handlers.rs @@ -33,11 +33,11 @@ pub mod handlers_tests; /// Defines how the network receiver handles incoming workers messages. #[derive(Clone)] pub struct WorkerReceiverHandler { + pub protocol_config: ProtocolConfig, pub id: WorkerId, pub client: NetworkClient, pub store: DBMap, pub validator: V, - pub protocol_config: ProtocolConfig, } #[async_trait] @@ -141,6 +141,7 @@ pub struct PrimaryReceiverHandler { pub id: WorkerId, // The committee information. pub committee: Committee, + pub protocol_config: ProtocolConfig, // The worker information cache. pub worker_cache: WorkerCache, // The batch store @@ -155,8 +156,6 @@ pub struct PrimaryReceiverHandler { pub batch_fetcher: Option, // Validate incoming batches pub validator: V, - // The protocol configuration. - pub protocol_config: ProtocolConfig, } #[async_trait] diff --git a/narwhal/worker/src/tests/batch_maker_tests.rs b/narwhal/worker/src/tests/batch_maker_tests.rs index f2550955bd2054..3c1f14ed0dd043 100644 --- a/narwhal/worker/src/tests/batch_maker_tests.rs +++ b/narwhal/worker/src/tests/batch_maker_tests.rs @@ -25,7 +25,7 @@ async fn make_batch() { // Mock the primary client to always succeed. let mut mock_server = MockWorkerToPrimary::new(); mock_server - .expect_report_our_batch_v2() + .expect_report_own_batch() .returning(|_| Ok(anemo::Response::new(()))); client.set_worker_to_primary_local_handler(Arc::new(mock_server)); @@ -82,7 +82,7 @@ async fn batch_timeout() { // Mock the primary client to always succeed. let mut mock_server = MockWorkerToPrimary::new(); mock_server - .expect_report_our_batch_v2() + .expect_report_own_batch() .returning(|_| Ok(anemo::Response::new(()))); client.set_worker_to_primary_local_handler(Arc::new(mock_server)); diff --git a/narwhal/worker/src/tests/handlers_tests.rs b/narwhal/worker/src/tests/handlers_tests.rs index 4cc9017982655f..3e225afd94f7b6 100644 --- a/narwhal/worker/src/tests/handlers_tests.rs +++ b/narwhal/worker/src/tests/handlers_tests.rs @@ -61,6 +61,7 @@ async fn synchronize() { authority_id, id, committee, + protocol_config: latest_protocol_config.clone(), worker_cache, store: store.clone(), request_batch_timeout: Duration::from_secs(999), @@ -68,7 +69,6 @@ async fn synchronize() { network: Some(send_network), batch_fetcher: None, validator: TrivialTransactionValidator, - protocol_config: latest_protocol_config.clone(), }; // Verify the batch is not in store @@ -158,6 +158,7 @@ async fn synchronize_versioned_batches() { authority_id, id, committee: committee.clone(), + protocol_config: protocol_config_v11.clone(), worker_cache: worker_cache.clone(), store: store.clone(), request_batch_timeout: Duration::from_secs(999), @@ -165,13 +166,13 @@ async fn synchronize_versioned_batches() { network: Some(send_network.clone()), batch_fetcher: None, validator: TrivialTransactionValidator, - protocol_config: protocol_config_v11.clone(), }; let handler_latest_version = PrimaryReceiverHandler { authority_id, id, committee, + protocol_config: latest_protocol_config.clone(), worker_cache, store: store.clone(), request_batch_timeout: Duration::from_secs(999), @@ -179,7 +180,6 @@ async fn synchronize_versioned_batches() { network: Some(send_network), batch_fetcher: None, validator: TrivialTransactionValidator, - protocol_config: latest_protocol_config.clone(), }; // Case #1: Receive BatchV1 and network has not upgraded to 12 so we are okay @@ -228,6 +228,7 @@ async fn synchronize_when_batch_exists() { authority_id, id, committee: committee.clone(), + protocol_config: latest_protocol_version(), worker_cache, store: store.clone(), request_batch_timeout: Duration::from_secs(999), @@ -235,7 +236,6 @@ async fn synchronize_when_batch_exists() { network: Some(send_network), batch_fetcher: None, validator: TrivialTransactionValidator, - protocol_config: latest_protocol_version(), }; // Store the batch. @@ -279,6 +279,7 @@ async fn delete_batches() { authority_id, id, committee, + protocol_config: latest_protocol_version(), worker_cache, store: store.clone(), request_batch_timeout: Duration::from_secs(999), @@ -286,7 +287,6 @@ async fn delete_batches() { network: None, batch_fetcher: None, validator: TrivialTransactionValidator, - protocol_config: latest_protocol_version(), }; let message = WorkerDeleteBatchesMessage { digests: vec![digest], diff --git a/narwhal/worker/src/tests/worker_tests.rs b/narwhal/worker/src/tests/worker_tests.rs index 8e51d74ac3e06f..d8635fe3554816 100644 --- a/narwhal/worker/src/tests/worker_tests.rs +++ b/narwhal/worker/src/tests/worker_tests.rs @@ -88,13 +88,13 @@ async fn reject_invalid_clients_transactions() { worker_id, committee.clone(), worker_cache.clone(), + latest_protocol_version(), parameters, NilTxValidator, client, batch_store, metrics, &mut tx_shutdown, - latest_protocol_version(), ); // Wait till other services have been able to start up @@ -184,13 +184,13 @@ async fn handle_remote_clients_transactions() { worker_id, committee.clone(), worker_cache.clone(), + latest_protocol_version(), parameters, TrivialTransactionValidator::default(), client.clone(), batch_store, metrics, &mut tx_shutdown, - latest_protocol_version(), ); // Spawn a network listener to receive our batch's digest. @@ -203,7 +203,7 @@ async fn handle_remote_clients_transactions() { let (tx_await_batch, mut rx_await_batch) = test_utils::test_channel!(CHANNEL_CAPACITY); let mut mock_primary_server = MockWorkerToPrimary::new(); mock_primary_server - .expect_report_our_batch_v2() + .expect_report_own_batch() .withf(move |request| { let message = request.body(); @@ -303,13 +303,13 @@ async fn handle_local_clients_transactions() { worker_id, committee.clone(), worker_cache.clone(), + latest_protocol_version(), parameters, TrivialTransactionValidator::default(), client.clone(), batch_store, metrics, &mut tx_shutdown, - latest_protocol_version(), ); // Spawn a network listener to receive our batch's digest. @@ -322,7 +322,7 @@ async fn handle_local_clients_transactions() { let (tx_await_batch, mut rx_await_batch) = test_utils::test_channel!(CHANNEL_CAPACITY); let mut mock_primary_server = MockWorkerToPrimary::new(); mock_primary_server - .expect_report_our_batch_v2() + .expect_report_own_batch() .withf(move |request| { let message = request.body(); message.digest == batch_digest && message.worker_id == worker_id @@ -410,6 +410,7 @@ async fn get_network_peers_from_admin_server() { authority_1.network_keypair().copy(), committee.clone(), worker_cache.clone(), + latest_protocol_version(), primary_1_parameters.clone(), client_1.clone(), store.header_store.clone(), @@ -433,7 +434,6 @@ async fn get_network_peers_from_admin_server() { &mut tx_shutdown, tx_feedback, &Registry::new(), - latest_protocol_version(), ); // Wait for tasks to start @@ -455,13 +455,13 @@ async fn get_network_peers_from_admin_server() { worker_id, committee.clone(), worker_cache.clone(), + latest_protocol_version(), worker_1_parameters.clone(), TrivialTransactionValidator::default(), client_1.clone(), store.batch_store.clone(), metrics_1.clone(), &mut tx_shutdown, - latest_protocol_version(), ); let primary_1_peer_id = Hex::encode(authority_1.network_keypair().copy().public().0.as_bytes()); @@ -535,6 +535,7 @@ async fn get_network_peers_from_admin_server() { authority_2.network_keypair().copy(), committee.clone(), worker_cache.clone(), + latest_protocol_version(), primary_2_parameters.clone(), client_2.clone(), store.header_store.clone(), @@ -558,7 +559,6 @@ async fn get_network_peers_from_admin_server() { &mut tx_shutdown_2, tx_feedback_2, &Registry::new(), - latest_protocol_version(), ); // Wait for tasks to start @@ -581,13 +581,13 @@ async fn get_network_peers_from_admin_server() { worker_id, committee.clone(), worker_cache.clone(), + latest_protocol_version(), worker_2_parameters.clone(), TrivialTransactionValidator::default(), client_2, store.batch_store, metrics_2.clone(), &mut tx_shutdown_worker, - latest_protocol_version(), ); // Wait for tasks to start. Sleeping longer here to ensure all primaries and workers diff --git a/narwhal/worker/src/worker.rs b/narwhal/worker/src/worker.rs index 9abea77c3370b7..dcea35826b80a8 100644 --- a/narwhal/worker/src/worker.rs +++ b/narwhal/worker/src/worker.rs @@ -62,12 +62,12 @@ pub struct Worker { committee: Committee, /// The worker information cache. worker_cache: WorkerCache, + // The protocol configuration. + protocol_config: ProtocolConfig, /// The configuration parameters parameters: Parameters, /// The persistent storage. store: DBMap, - // The protocol configuration. - protocol_config: ProtocolConfig, } impl Worker { @@ -77,13 +77,13 @@ impl Worker { id: WorkerId, committee: Committee, worker_cache: WorkerCache, + protocol_config: ProtocolConfig, parameters: Parameters, validator: impl TransactionValidator, client: NetworkClient, store: DBMap, metrics: Metrics, tx_shutdown: &mut PreSubscribedBroadcastSender, - protocol_config: ProtocolConfig, ) -> Vec> { let worker_name = keypair.public().clone(); let worker_peer_id = PeerId(worker_name.0.to_bytes()); @@ -96,9 +96,9 @@ impl Worker { id, committee: committee.clone(), worker_cache, + protocol_config: protocol_config.clone(), parameters: parameters.clone(), store, - protocol_config: protocol_config.clone(), }; let node_metrics = Arc::new(metrics.worker_metrics.unwrap()); @@ -111,11 +111,11 @@ impl Worker { let mut shutdown_receivers = tx_shutdown.subscribe_n(NUM_SHUTDOWN_RECEIVERS); let mut worker_service = WorkerToWorkerServer::new(WorkerReceiverHandler { + protocol_config: protocol_config.clone(), id: worker.id, client: client.clone(), store: worker.store.clone(), validator: validator.clone(), - protocol_config: protocol_config.clone(), }); // Apply rate limits from configuration as needed. if let Some(limit) = parameters.anemo.report_batch_rate_limit { @@ -140,6 +140,7 @@ impl Worker { authority_id: worker.authority.id(), id: worker.id, committee: worker.committee.clone(), + protocol_config: protocol_config.clone(), worker_cache: worker.worker_cache.clone(), store: worker.store.clone(), request_batch_timeout: worker.parameters.sync_retry_delay, @@ -147,7 +148,6 @@ impl Worker { network: None, batch_fetcher: None, validator: validator.clone(), - protocol_config: protocol_config.clone(), }); // Receive incoming messages from other workers. @@ -290,6 +290,7 @@ impl Worker { authority_id: worker.authority.id(), id: worker.id, committee: worker.committee.clone(), + protocol_config, worker_cache: worker.worker_cache.clone(), store: worker.store.clone(), request_batch_timeout: worker.parameters.sync_retry_delay, @@ -297,7 +298,6 @@ impl Worker { network: Some(network.clone()), batch_fetcher: Some(batch_fetcher), validator: validator.clone(), - protocol_config, }), );