Skip to content

Commit

Permalink
Move protocol config to NarwhalManager start...
Browse files Browse the repository at this point in the history
Remove epoch from BatchV2
  • Loading branch information
arun-koshy committed May 27, 2023
1 parent f5f16c0 commit f8d79f6
Show file tree
Hide file tree
Showing 32 changed files with 149 additions and 366 deletions.
20 changes: 4 additions & 16 deletions crates/sui-core/src/consensus_validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,11 +213,7 @@ mod tests {
})
.collect();

let batch = Batch::new(
transaction_bytes,
latest_protocol_config,
validator.epoch_store.epoch(),
);
let batch = Batch::new(transaction_bytes, latest_protocol_config);
let res_batch = validator
.validate_batch(&batch, latest_protocol_config)
.await;
Expand All @@ -235,11 +231,7 @@ mod tests {
})
.collect();

let batch = Batch::new(
bogus_transaction_bytes,
latest_protocol_config,
validator.epoch_store.epoch(),
);
let batch = Batch::new(bogus_transaction_bytes, latest_protocol_config);
let res_batch = validator
.validate_batch(&batch, latest_protocol_config)
.await;
Expand All @@ -248,7 +240,7 @@ mod tests {
// TODO: Remove once we have upgraded to protocol version 12.
// protocol version 11 should only support BatchV1
let protocol_config_v11 = &get_protocol_config(11);
let batch_v1 = Batch::new(vec![], protocol_config_v11, validator.epoch_store.epoch());
let batch_v1 = Batch::new(vec![], protocol_config_v11);

// Case #1: Receive BatchV1 and network has not upgraded to 12 so we are okay
let res_batch = validator
Expand All @@ -261,11 +253,7 @@ mod tests {
.await;
assert!(res_batch.is_err());

let batch_v2 = Batch::new(
vec![],
latest_protocol_config,
validator.epoch_store.epoch(),
);
let batch_v2 = Batch::new(vec![], latest_protocol_config);
// Case #3: Receive BatchV2 but network is still in v11 so we fail because we expect BatchV1
let res_batch = validator
.validate_batch(&batch_v2, protocol_config_v11)
Expand Down
50 changes: 27 additions & 23 deletions crates/sui-core/src/narwhal_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@ use prometheus::{register_int_gauge_with_registry, IntGauge, Registry};
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Instant;
use sui_protocol_config::ProtocolConfig;
use sui_protocol_config::{ProtocolVersion, ProtocolConfig};
use sui_types::crypto::{AuthorityKeyPair, NetworkKeyPair};
use tokio::sync::Mutex;

#[derive(PartialEq)]
enum Running {
True(Epoch),
True(Epoch, ProtocolVersion),
False,
}

Expand Down Expand Up @@ -89,25 +89,17 @@ pub struct NarwhalManager {
}

impl NarwhalManager {
pub fn new(
protocol_config: ProtocolConfig,
config: NarwhalConfiguration,
metrics: NarwhalManagerMetrics,
) -> Self {
pub fn new(config: NarwhalConfiguration, metrics: NarwhalManagerMetrics) -> Self {
// Create the Narwhal Primary with configuration
let primary_node = PrimaryNode::new(
protocol_config.clone(),
config.parameters.clone(),
true,
config.registry_service.clone(),
);

// Create Narwhal Workers with configuration
let worker_nodes = WorkerNodes::new(
config.registry_service.clone(),
protocol_config,
config.parameters.clone(),
);
let worker_nodes =
WorkerNodes::new(config.registry_service.clone(), config.parameters.clone());

let store_cache_metrics =
CertificateStoreCacheMetrics::new(&config.registry_service.default_registry());
Expand All @@ -126,9 +118,15 @@ impl NarwhalManager {
}

// Starts the Narwhal (primary & worker(s)) - if not already running.
// Note: After a binary is updated with the new protocol version and the node
// is restarted, the protocol config does not take effect until we have a quorum
// of validators have updated the binary. Because of this the protocol upgrade
// will happen in the following epoch after quorum is reached. In this case NarwhalManager
// is not recreated which is why we pass protocol config in at start and not at creation.
pub async fn start<State, TxValidator: TransactionValidator>(
&self,
committee: Committee,
protocol_config: ProtocolConfig,
worker_cache: WorkerCache,
execution_state: Arc<State>,
tx_validator: TxValidator,
Expand All @@ -137,10 +135,10 @@ impl NarwhalManager {
{
let mut running = self.running.lock().await;

if let Running::True(epoch) = *running {
if let Running::True(epoch, version) = *running {
tracing::warn!(
"Narwhal node is already Running at epoch {:?} - shutdown first before starting",
epoch
"Narwhal node is already Running for epoch {epoch:?} & version {version:?} - shutdown first before starting",

);
return;
}
Expand All @@ -156,7 +154,11 @@ impl NarwhalManager {

let name = self.primary_keypair.public().clone();

tracing::info!("Starting up Narwhal for epoch {}", committee.epoch());
tracing::info!(
"Starting up Narwhal for epoch {} & version {:?}",
committee.epoch(),
protocol_config.version
);

// start primary
const MAX_PRIMARY_RETRIES: u32 = 2;
Expand All @@ -168,6 +170,7 @@ impl NarwhalManager {
self.primary_keypair.copy(),
self.network_keypair.copy(),
committee.clone(),
protocol_config.clone(),
worker_cache.clone(),
network_client.clone(),
&store,
Expand Down Expand Up @@ -206,6 +209,7 @@ impl NarwhalManager {
name.clone(),
id_keypair_copy,
committee.clone(),
protocol_config.clone(),
worker_cache.clone(),
network_client.clone(),
&store,
Expand All @@ -228,8 +232,9 @@ impl NarwhalManager {
}

tracing::info!(
"Starting up Narwhal for epoch {} is complete - took {} seconds",
"Starting up Narwhal for epoch {} & version {:?} is complete - took {} seconds",
committee.epoch(),
protocol_config.version,
now.elapsed().as_secs_f64()
);

Expand All @@ -242,7 +247,7 @@ impl NarwhalManager {
.set(primary_retries as i64);
self.metrics.start_worker_retries.set(worker_retries as i64);

*running = Running::True(committee.epoch());
*running = Running::True(committee.epoch(), protocol_config.version);
}

// Shuts down whole Narwhal (primary & worker(s)) and waits until nodes
Expand All @@ -251,16 +256,15 @@ impl NarwhalManager {
let mut running = self.running.lock().await;

match *running {
Running::True(epoch) => {
Running::True(epoch, version) => {
let now = Instant::now();
tracing::info!("Shutting down Narwhal epoch {:?}", epoch);
tracing::info!("Shutting down Narwhal for epoch {epoch:?} & version {version:?}");

self.primary_node.shutdown().await;
self.worker_nodes.shutdown().await;

tracing::info!(
"Narwhal shutdown for epoch {:?} is complete - took {} seconds",
epoch,
"Narwhal shutdown for epoch {epoch:?} & version {version:?} is complete - took {} seconds",
now.elapsed().as_secs_f64()
);

Expand Down
5 changes: 3 additions & 2 deletions crates/sui-core/src/unit_tests/narwhal_manager_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,13 +127,13 @@ async fn test_narwhal_manager() {

let metrics = NarwhalManagerMetrics::new(&Registry::new());

let narwhal_manager =
NarwhalManager::new(latest_protocol_version(), narwhal_config, metrics);
let narwhal_manager = NarwhalManager::new(narwhal_config, metrics);

// start narwhal
narwhal_manager
.start(
narwhal_committee.clone(),
latest_protocol_version(),
worker_cache.clone(),
Arc::new(execution_state.clone()),
TrivialTransactionValidator::default(),
Expand Down Expand Up @@ -195,6 +195,7 @@ async fn test_narwhal_manager() {
narwhal_manager
.start(
narwhal_committee.clone(),
latest_protocol_version(),
worker_cache.clone(),
Arc::new(execution_state.clone()),
TrivialTransactionValidator::default(),
Expand Down
16 changes: 4 additions & 12 deletions crates/sui-node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -676,12 +676,8 @@ impl SuiNode {
connection_monitor_status.clone(),
&registry_service.default_registry(),
));
let narwhal_manager = Self::construct_narwhal_manager(
epoch_store.protocol_config().clone(),
config,
consensus_config,
registry_service,
)?;
let narwhal_manager =
Self::construct_narwhal_manager(config, consensus_config, registry_service)?;

let mut narwhal_epoch_data_remover =
EpochDataRemover::new(narwhal_manager.get_storage_base_path());
Expand Down Expand Up @@ -776,6 +772,7 @@ impl SuiNode {
narwhal_manager
.start(
new_epoch_start_state.get_narwhal_committee(),
epoch_store.protocol_config().clone(),
worker_cache,
consensus_handler,
SuiTxValidator::new(
Expand Down Expand Up @@ -846,7 +843,6 @@ impl SuiNode {
}

fn construct_narwhal_manager(
protocol_config: ProtocolConfig,
config: &NodeConfig,
consensus_config: &ConsensusConfig,
registry_service: &RegistryService,
Expand All @@ -862,11 +858,7 @@ impl SuiNode {

let metrics = NarwhalManagerMetrics::new(&registry_service.default_registry());

Ok(NarwhalManager::new(
protocol_config,
narwhal_config,
metrics,
))
Ok(NarwhalManager::new(narwhal_config, metrics))
}

fn construct_consensus_adapter(
Expand Down
2 changes: 1 addition & 1 deletion narwhal/node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,6 @@ async fn run(
// Spawn the primary and consensus core.
("primary", Some(sub_matches)) => {
let primary = PrimaryNode::new(
ProtocolConfig::get_for_version(ProtocolVersion::max()),
parameters.clone(),
!sub_matches.is_present("consensus-disabled"),
registry_service,
Expand All @@ -293,6 +292,7 @@ async fn run(
primary_keypair,
primary_network_keypair,
committee,
ProtocolConfig::get_for_version(ProtocolVersion::max()),
worker_cache,
client.clone(),
&store,
Expand Down
8 changes: 4 additions & 4 deletions narwhal/node/src/primary_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ use tracing::{debug, info, instrument};
use types::{Certificate, ConditionalBroadcastReceiver, PreSubscribedBroadcastSender, Round};

struct PrimaryNodeInner {
protocol_config: ProtocolConfig,
// The configuration parameters.
parameters: Parameters,
// Whether to run consensus (and an executor client) or not.
Expand Down Expand Up @@ -68,6 +67,7 @@ impl PrimaryNodeInner {
network_keypair: NetworkKeyPair,
// The committee information.
committee: Committee,
protocol_config: ProtocolConfig,
// The worker information cache.
worker_cache: WorkerCache,
// Client for communications.
Expand Down Expand Up @@ -101,7 +101,7 @@ impl PrimaryNodeInner {
worker_cache,
client,
store,
self.protocol_config.clone(),
protocol_config.clone(),
self.parameters.clone(),
self.internal_consensus,
execution_state,
Expand Down Expand Up @@ -406,13 +406,11 @@ pub struct PrimaryNode {

impl PrimaryNode {
pub fn new(
protocol_config: ProtocolConfig,
parameters: Parameters,
internal_consensus: bool,
registry_service: RegistryService,
) -> PrimaryNode {
let inner = PrimaryNodeInner {
protocol_config,
parameters,
internal_consensus,
registry_service,
Expand All @@ -435,6 +433,7 @@ impl PrimaryNode {
network_keypair: NetworkKeyPair,
// The committee information.
committee: Committee,
protocol_config: ProtocolConfig,
// The worker information cache.
worker_cache: WorkerCache,
// Client for communications.
Expand All @@ -455,6 +454,7 @@ impl PrimaryNode {
keypair,
network_keypair,
committee,
protocol_config,
worker_cache,
client,
store,
Expand Down
11 changes: 3 additions & 8 deletions narwhal/node/src/worker_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -260,22 +260,16 @@ pub struct WorkerNodes {
workers: ArcSwap<HashMap<WorkerId, WorkerNode>>,
registry_service: RegistryService,
registry_id: ArcSwapOption<RegistryID>,
protocol_config: ProtocolConfig,
parameters: Parameters,
client: ArcSwapOption<NetworkClient>,
}

impl WorkerNodes {
pub fn new(
registry_service: RegistryService,
protocol_config: ProtocolConfig,
parameters: Parameters,
) -> Self {
pub fn new(registry_service: RegistryService, parameters: Parameters) -> Self {
Self {
workers: ArcSwap::from(Arc::new(HashMap::default())),
registry_service,
registry_id: ArcSwapOption::empty(),
protocol_config,
parameters,
client: ArcSwapOption::empty(),
}
Expand All @@ -290,6 +284,7 @@ impl WorkerNodes {
ids_and_keypairs: Vec<(WorkerId, NetworkKeyPair)>,
// The committee information.
committee: Committee,
protocol_config: ProtocolConfig,
// The worker information cache.
worker_cache: WorkerCache,
// Client for communications.
Expand Down Expand Up @@ -321,7 +316,7 @@ impl WorkerNodes {
for (worker_id, key_pair) in ids_and_keypairs {
let worker = WorkerNode::new(
worker_id,
self.protocol_config.clone(),
protocol_config.clone(),
self.parameters.clone(),
self.registry_service.clone(),
);
Expand Down
Loading

0 comments on commit f8d79f6

Please sign in to comment.