diff --git a/src/bin/bench-tps.rs b/src/bin/bench-tps.rs index 685a6a477ed396..f60705f2e94c1e 100644 --- a/src/bin/bench-tps.rs +++ b/src/bin/bench-tps.rs @@ -686,7 +686,7 @@ fn converge( &spy_ref, window.clone(), None, - gossip_socket, + Arc::new(gossip_socket), exit_signal.clone(), ); let mut v: Vec = vec![]; diff --git a/src/broadcast_stage.rs b/src/broadcast_stage.rs index 97a599c630c646..76dc8aa93a5347 100644 --- a/src/broadcast_stage.rs +++ b/src/broadcast_stage.rs @@ -168,13 +168,12 @@ impl BroadcastStage { /// See `crdt` for network layer definitions. /// # Arguments /// * `sock` - Socket to send from. - /// * `exit` - Boolean to signal system exit. /// * `crdt` - CRDT structure /// * `window` - Cache of blobs that we have broadcast /// * `recycler` - Blob recycler. /// * `receiver` - Receive channel for blobs to be retransmitted to all the layer 1 nodes. pub fn new( - sock: UdpSocket, + sock: Arc, crdt: Arc>, window: SharedWindow, entry_height: u64, diff --git a/src/crdt.rs b/src/crdt.rs index d4ad10ec1dd013..850c2f25e61a72 100644 --- a/src/crdt.rs +++ b/src/crdt.rs @@ -1261,14 +1261,14 @@ impl Crdt { } pub struct Sockets { - pub gossip: UdpSocket, - pub requests: UdpSocket, - pub replicate: UdpSocket, - pub transaction: UdpSocket, - pub respond: UdpSocket, - pub broadcast: UdpSocket, - pub repair: UdpSocket, - pub retransmit: UdpSocket, + pub gossip: Arc, + pub requests: Arc, + pub replicate: Arc, + pub transaction: Arc, + pub respond: Arc, + pub broadcast: Arc, + pub repair: Arc, + pub retransmit: Arc, } pub struct Node { @@ -1301,14 +1301,14 @@ impl Node { Node { info, sockets: Sockets { - gossip, - requests, - replicate, - transaction, - respond, - broadcast, - repair, - retransmit, + gossip: Arc::new(gossip), + requests: Arc::new(requests), + replicate: Arc::new(replicate), + transaction: Arc::new(transaction), + respond: Arc::new(respond), + broadcast: Arc::new(broadcast), + repair: Arc::new(repair), + retransmit: Arc::new(retransmit), }, } } @@ -1362,14 +1362,14 @@ impl Node { Node { info, sockets: Sockets { - gossip, - requests, - replicate, - transaction, - respond, - broadcast, - repair, - retransmit, + gossip: Arc::new(gossip), + requests: Arc::new(requests), + replicate: Arc::new(replicate), + transaction: Arc::new(transaction), + respond: Arc::new(respond), + broadcast: Arc::new(broadcast), + repair: Arc::new(repair), + retransmit: Arc::new(retransmit), }, } } diff --git a/src/drone.rs b/src/drone.rs index 94f64b2498ed30..8ef1e351fbb144 100644 --- a/src/drone.rs +++ b/src/drone.rs @@ -286,66 +286,69 @@ mod tests { let leader_data = leader.info.clone(); let ledger_path = tmp_ledger_path("send_airdrop"); - let server = Fullnode::new_with_bank( - leader_keypair, - bank, - 0, - &[], - leader, - None, - exit.clone(), - Some(&ledger_path), - false, - ); - //TODO: this seems unstable - sleep(Duration::from_millis(900)); - - let mut addr: SocketAddr = "0.0.0.0:9900".parse().expect("bind to drone socket"); - addr.set_ip(get_ip_addr().expect("drone get_ip_addr")); - let mut drone = Drone::new( - alice.keypair(), - addr, - leader_data.contact_info.tpu, - leader_data.contact_info.rpu, - None, - Some(150_000), - ); - - let requests_socket = UdpSocket::bind("0.0.0.0:0").expect("drone bind to requests socket"); - let transactions_socket = - UdpSocket::bind("0.0.0.0:0").expect("drone bind to transactions socket"); - - let mut client = ThinClient::new( - leader_data.contact_info.rpu, - requests_socket, - leader_data.contact_info.tpu, - transactions_socket, - ); - - let bob_req = DroneRequest::GetAirdrop { - airdrop_request_amount: 50, - client_pubkey: bob_pubkey, - }; - let bob_sig = drone.send_airdrop(bob_req).unwrap(); - assert!(client.poll_for_signature(&bob_sig).is_ok()); - - let carlos_req = DroneRequest::GetAirdrop { - airdrop_request_amount: 5_000_000, - client_pubkey: carlos_pubkey, - }; - let carlos_sig = drone.send_airdrop(carlos_req).unwrap(); - assert!(client.poll_for_signature(&carlos_sig).is_ok()); + { + let server = Fullnode::new_with_bank( + leader_keypair, + bank, + 0, + &[], + leader, + None, + exit.clone(), + Some(&ledger_path), + false, + ); + //TODO: this seems unstable + sleep(Duration::from_millis(900)); + + let mut addr: SocketAddr = "0.0.0.0:9900".parse().expect("bind to drone socket"); + addr.set_ip(get_ip_addr().expect("drone get_ip_addr")); + let mut drone = Drone::new( + alice.keypair(), + addr, + leader_data.contact_info.tpu, + leader_data.contact_info.rpu, + None, + Some(150_000), + ); - let bob_balance = client.get_balance(&bob_pubkey); - info!("Small request balance: {:?}", bob_balance); - assert_eq!(bob_balance.unwrap(), SMALL_BATCH); + let requests_socket = + UdpSocket::bind("0.0.0.0:0").expect("drone bind to requests socket"); + let transactions_socket = + UdpSocket::bind("0.0.0.0:0").expect("drone bind to transactions socket"); - let carlos_balance = client.get_balance(&carlos_pubkey); - info!("TPS request balance: {:?}", carlos_balance); - assert_eq!(carlos_balance.unwrap(), TPS_BATCH); + let mut client = ThinClient::new( + leader_data.contact_info.rpu, + requests_socket, + leader_data.contact_info.tpu, + transactions_socket, + ); - exit.store(true, Ordering::Relaxed); - server.join().unwrap(); + let bob_req = DroneRequest::GetAirdrop { + airdrop_request_amount: 50, + client_pubkey: bob_pubkey, + }; + let bob_sig = drone.send_airdrop(bob_req).unwrap(); + assert!(client.poll_for_signature(&bob_sig).is_ok()); + + let carlos_req = DroneRequest::GetAirdrop { + airdrop_request_amount: 5_000_000, + client_pubkey: carlos_pubkey, + }; + let carlos_sig = drone.send_airdrop(carlos_req).unwrap(); + assert!(client.poll_for_signature(&carlos_sig).is_ok()); + + let bob_balance = client.get_balance(&bob_pubkey); + info!("Small request balance: {:?}", bob_balance); + assert_eq!(bob_balance.unwrap(), SMALL_BATCH); + + let carlos_balance = client.get_balance(&carlos_pubkey); + info!("TPS request balance: {:?}", carlos_balance); + assert_eq!(carlos_balance.unwrap(), TPS_BATCH); + + exit.store(true, Ordering::Relaxed); + server.join().unwrap(); + } remove_dir_all(ledger_path).unwrap(); } } diff --git a/src/fullnode.rs b/src/fullnode.rs index 9ee57fa290b2e8..68acb473a5c30e 100644 --- a/src/fullnode.rs +++ b/src/fullnode.rs @@ -2,7 +2,7 @@ use bank::Bank; use broadcast_stage::BroadcastStage; -use crdt::{Crdt, Node, NodeInfo}; +use crdt::{Crdt, Node, NodeInfo, Sockets}; use drone::DRONE_PORT; use entry::Entry; use ledger::read_ledger; @@ -21,9 +21,211 @@ use tvu::Tvu; use untrusted::Input; use window; -pub struct Fullnode { - exit: Arc, +#[derive(Copy, Clone, Debug)] +pub enum NodeEvent { + /// State Change from Validator to Leader + ValidatorToLeader, + /// State Change from Leader to Validator + LeaderToValidator, +} + +pub struct CommonState { + keypair: Arc, + bank: Arc, + blob_recycler: BlobRecycler, + window: window::SharedWindow, + crdt: Arc>, + sockets: Sockets, +} + +#[derive(Copy, Clone, Debug)] +pub enum NodeRole { + Leader, + Validator, +} + +struct ServiceManager { thread_hdls: Vec>, + exit: Arc, +} + +impl ServiceManager { + fn new() -> Self { + let exit = Arc::new(AtomicBool::new(false)); + let thread_hdls = vec![]; + ServiceManager { exit, thread_hdls } + } + + fn thread_hdls(self) -> Vec> { + self.thread_hdls + } + + fn exit(&self) -> () { + self.exit.store(true, Ordering::Relaxed); + } + + fn join(self) -> Result<()> { + let mut results = vec![]; + for thread_hdl in self.thread_hdls { + let result = thread_hdl.join(); + if let Err(ref err) = &result { + println!("Thread panicked with error: {:?}", err); + } + + results.push(result); + } + + for r in results { + r?; + } + + Ok(()) + } + + fn close(self) -> Result<()> { + self.exit(); + self.join() + } +} + +trait RoleServices { + fn close_services(self: Box) -> Result<()>; + fn service_manager_ref(&self) -> &ServiceManager; + fn service_manager_owned(self: Box) -> ServiceManager; +} + +struct LeaderRole { + service_manager: ServiceManager, +} + +impl LeaderRole { + fn new( + entry_height: u64, + common_state: &CommonState, + sigverify_disabled: bool, + ledger_path: &str, + ) -> Self { + let mut service_manager = ServiceManager::new(); + let service_handles = Self::leader_services( + entry_height, + common_state, + sigverify_disabled, + ledger_path, + service_manager.exit.clone(), + ); + + service_manager.thread_hdls = service_handles; + LeaderRole { service_manager } + } + + fn leader_services( + entry_height: u64, + common_state: &CommonState, + sigverify_disabled: bool, + ledger_path: &str, + exit: Arc, + ) -> Vec> { + let mut thread_hdls = vec![]; + let tick_duration = None; + // TODO: To light up PoH, uncomment the following line: + //let tick_duration = Some(Duration::from_millis(1000)); + let (tpu, blob_receiver) = Tpu::new( + &common_state.keypair, + &common_state.bank, + &common_state.crdt, + tick_duration, + common_state.sockets.transaction.clone(), + &common_state.blob_recycler, + exit, + ledger_path, + sigverify_disabled, + ); + + thread_hdls.extend(tpu.thread_hdls()); + + let broadcast_stage = BroadcastStage::new( + common_state.sockets.broadcast.clone(), + common_state.crdt.clone(), + common_state.window.clone(), + entry_height, + common_state.blob_recycler.clone(), + blob_receiver, + ); + + thread_hdls.extend(broadcast_stage.thread_hdls()); + thread_hdls + } +} + +impl RoleServices for LeaderRole { + fn close_services(self: Box) -> Result<()> { + self.service_manager_owned().close() + } + + fn service_manager_ref(&self) -> &ServiceManager { + &self.service_manager + } + + fn service_manager_owned(self: Box) -> ServiceManager { + self.service_manager + } +} + +struct ValidatorRole { + service_manager: ServiceManager, +} + +impl ValidatorRole { + fn new(entry_height: u64, common_state: &CommonState, ledger_path: Option<&str>) -> Self { + let mut service_manager = ServiceManager::new(); + let service_handles = Self::validator_services( + entry_height, + common_state, + ledger_path, + service_manager.exit.clone(), + ); + + service_manager.thread_hdls = service_handles; + ValidatorRole { service_manager } + } + + fn validator_services( + entry_height: u64, + common_state: &CommonState, + ledger_path: Option<&str>, + exit: Arc, + ) -> Vec> { + let mut thread_hdls = vec![]; + let tvu = Tvu::new( + &common_state.keypair, + &common_state.bank, + entry_height, + common_state.crdt.clone(), + common_state.window.clone(), + common_state.sockets.replicate.clone(), + common_state.sockets.repair.clone(), + common_state.sockets.retransmit.clone(), + ledger_path, + exit, + ); + + thread_hdls.extend(tvu.thread_hdls()); + thread_hdls + } +} + +impl RoleServices for ValidatorRole { + fn close_services(self: Box) -> Result<()> { + self.service_manager_owned().close() + } + + fn service_manager_ref(&self) -> &ServiceManager { + &self.service_manager + } + + fn service_manager_owned(self: Box) -> ServiceManager { + self.service_manager + } } #[derive(Serialize, Deserialize, Clone, Debug, PartialEq)] @@ -48,6 +250,16 @@ impl Config { } } +pub struct Fullnode { + exit: Arc, + thread_hdls: Vec>, + node_role: NodeRole, + role_services: Option>, + common_state: CommonState, + ledger_path: Option, + sigverify_disabled: bool, +} + impl Fullnode { pub fn new( node: Node, @@ -177,11 +389,10 @@ impl Fullnode { let bank = Arc::new(bank); let mut thread_hdls = vec![]; - let rpu = Rpu::new( &bank, - node.sockets.requests, - node.sockets.respond, + node.sockets.requests.clone(), + node.sockets.respond.clone(), exit.clone(), ); thread_hdls.extend(rpu.thread_hdls()); @@ -209,69 +420,113 @@ impl Fullnode { &crdt, window.clone(), ledger_path, - node.sockets.gossip, + node.sockets.gossip.clone(), exit.clone(), ); thread_hdls.extend(ncp.thread_hdls()); + let keypair = Arc::new(keypair); + // Make the common state info + let common_state = CommonState { + keypair, + bank, + blob_recycler, + window, + crdt, + sockets: node.sockets, + }; + + let node_role; + let role_services: Box; match leader_info { Some(leader_info) => { // Start in validator mode. // TODO: let Crdt get that data from the network? - crdt.write().unwrap().insert(leader_info); - let tvu = Tvu::new( - keypair, - &bank, - entry_height, - crdt, - window, - node.sockets.replicate, - node.sockets.repair, - node.sockets.retransmit, - ledger_path, - exit.clone(), - ); - thread_hdls.extend(tvu.thread_hdls()); + common_state.crdt.write().unwrap().insert(leader_info); + let validator_role = ValidatorRole::new(entry_height, &common_state, ledger_path); + + role_services = Box::new(validator_role); + node_role = NodeRole::Validator; } None => { // Start in leader mode. let ledger_path = ledger_path.expect("ledger path"); - let tick_duration = None; - // TODO: To light up PoH, uncomment the following line: - //let tick_duration = Some(Duration::from_millis(1000)); - - let (tpu, blob_receiver) = Tpu::new( - keypair, - &bank, - &crdt, - tick_duration, - node.sockets.transaction, - &blob_recycler, - exit.clone(), - ledger_path, - sigverify_disabled, + let leader_role = + LeaderRole::new(entry_height, &common_state, sigverify_disabled, ledger_path); + role_services = Box::new(leader_role); + node_role = NodeRole::Leader; + } + } + + Fullnode { + exit, + thread_hdls, + node_role, + role_services: Some(role_services), + sigverify_disabled, + common_state, + // Take ownership of the ledger path passed in because we store the value + ledger_path: ledger_path.map(|path| path.to_string()), + } + } + + pub fn handle_event(&mut self, event: NodeEvent) -> Result<()> { + let role_services_option = self.role_services.take(); + let role_services = role_services_option.unwrap(); + let ref_ledger_path = self.ledger_path.as_ref().map(String::as_ref); + + match (self.node_role, event) { + (NodeRole::Leader, NodeEvent::LeaderToValidator) => { + // TODO (carlin): If error occurs on closing the other services we + // still try to continue opening the other services? Should we return + // Ok or Err? Right now on join failures, we still try to spin up the + // new services and return an Error after. + let close_result = role_services.close_services(); + let validator_role = ValidatorRole::new( + 0, //TODO (carllin): fill in the actual entry_height + &self.common_state, + ref_ledger_path, ); - thread_hdls.extend(tpu.thread_hdls()); - - let broadcast_stage = BroadcastStage::new( - node.sockets.broadcast, - crdt, - window, - entry_height, - blob_recycler.clone(), - blob_receiver, + + self.role_services = Some(Box::new(validator_role)); + self.node_role = NodeRole::Validator; + close_result? + } + + (NodeRole::Validator, NodeEvent::ValidatorToLeader) => { + let ledger_path = + ref_ledger_path.expect("ledger path expected for transition to leader role"); + let close_result = role_services.close_services(); + let leader_role = LeaderRole::new( + 0, //TODO (carllin): fill in the actual entry_height + &self.common_state, + self.sigverify_disabled, + ledger_path, ); - thread_hdls.extend(broadcast_stage.thread_hdls()); + self.role_services = Some(Box::new(leader_role)); + self.node_role = NodeRole::Leader; + close_result? } + + _ => panic!( + format!( + "Invalid node role transition : {:#?} -> {:#?}", + self.node_role, event, + ).to_string() + ), } - Fullnode { exit, thread_hdls } + Ok(()) } //used for notifying many nodes in parallel to exit pub fn exit(&self) { + if let Some(ref role_services) = self.role_services { + role_services.service_manager_ref().exit(); + } self.exit.store(true, Ordering::Relaxed); } + pub fn close(self) -> Result<()> { self.exit(); self.join() @@ -280,13 +535,19 @@ impl Fullnode { impl Service for Fullnode { fn thread_hdls(self) -> Vec> { - self.thread_hdls + let mut thread_hdls = self.thread_hdls; + if let Some(role_services) = self.role_services { + thread_hdls.extend(role_services.service_manager_owned().thread_hdls()) + } + + thread_hdls } fn join(self) -> Result<()> { for thread_hdl in self.thread_hdls() { thread_hdl.join()?; } + Ok(()) } } diff --git a/src/ncp.rs b/src/ncp.rs index 11f3468848284a..4f0779b2d0bc74 100644 --- a/src/ncp.rs +++ b/src/ncp.rs @@ -21,12 +21,11 @@ impl Ncp { crdt: &Arc>, window: SharedWindow, ledger_path: Option<&str>, - gossip_socket: UdpSocket, + gossip_socket: Arc, exit: Arc, ) -> Self { let blob_recycler = BlobRecycler::default(); let (request_sender, request_receiver) = channel(); - let gossip_socket = Arc::new(gossip_socket); trace!( "Ncp: id: {:?}, listening on: {:?}", &crdt.read().unwrap().id.as_ref()[..4], diff --git a/src/replicate_stage.rs b/src/replicate_stage.rs index 3e33ce40c784b5..5e000609b11cc3 100644 --- a/src/replicate_stage.rs +++ b/src/replicate_stage.rs @@ -66,7 +66,7 @@ impl ReplicateStage { Ok(()) } pub fn new( - keypair: Keypair, + keypair: &Arc, bank: Arc, crdt: Arc>, blob_recycler: BlobRecycler, @@ -84,7 +84,7 @@ impl ReplicateStage { ); let vote_stage = VoteStage::new( - Arc::new(keypair), + keypair.clone(), bank.clone(), crdt.clone(), blob_recycler.clone(), diff --git a/src/retransmit_stage.rs b/src/retransmit_stage.rs index da5c4f48da7395..fea6116f8865a5 100644 --- a/src/retransmit_stage.rs +++ b/src/retransmit_stage.rs @@ -42,7 +42,6 @@ fn retransmit( /// See `crdt` for network layer definitions. /// # Arguments /// * `sock` - Socket to read from. Read timeout is set to 1. -/// * `exit` - Boolean to signal system exit. /// * `crdt` - This structure needs to be updated and populated by the bank and via gossip. /// * `recycler` - Blob recycler. /// * `r` - Receive channel for blobs to be retransmitted to all the layer 1 nodes. diff --git a/src/rpu.rs b/src/rpu.rs index 1cd7ca0007b910..637fd3223b336c 100644 --- a/src/rpu.rs +++ b/src/rpu.rs @@ -42,14 +42,14 @@ pub struct Rpu { impl Rpu { pub fn new( bank: &Arc, - requests_socket: UdpSocket, - respond_socket: UdpSocket, + requests_socket: Arc, + respond_socket: Arc, exit: Arc, ) -> Self { let packet_recycler = PacketRecycler::default(); let (packet_sender, packet_receiver) = channel(); let t_receiver = streamer::receiver( - Arc::new(requests_socket), + requests_socket, exit, packet_recycler.clone(), packet_sender, @@ -64,12 +64,8 @@ impl Rpu { blob_recycler.clone(), ); - let t_responder = streamer::responder( - "rpu", - Arc::new(respond_socket), - blob_recycler.clone(), - blob_receiver, - ); + let t_responder = + streamer::responder("rpu", respond_socket, blob_recycler.clone(), blob_receiver); let mut thread_hdls = vec![t_receiver, t_responder]; thread_hdls.extend(request_stage.thread_hdls().into_iter()); diff --git a/src/thin_client.rs b/src/thin_client.rs index 27df1ce6c5281c..6b6d95601357e3 100644 --- a/src/thin_client.rs +++ b/src/thin_client.rs @@ -364,7 +364,13 @@ pub fn poll_gossip_for_leader(leader_ncp: SocketAddr, timeout: Option) -> R let (node, gossip_socket) = Crdt::spy_node(); let crdt = Arc::new(RwLock::new(Crdt::new(node).expect("Crdt::new"))); let window = Arc::new(RwLock::new(vec![])); - let ncp = Ncp::new(&crdt.clone(), window, None, gossip_socket, exit.clone()); + let ncp = Ncp::new( + &crdt.clone(), + window, + None, + Arc::new(gossip_socket), + exit.clone(), + ); let leader_entry_point = NodeInfo::new_entry_point(&leader_ncp); crdt.write().unwrap().insert(&leader_entry_point); diff --git a/src/tpu.rs b/src/tpu.rs index 9011e56f80aa6b..d8f78926ac96dd 100644 --- a/src/tpu.rs +++ b/src/tpu.rs @@ -52,11 +52,11 @@ pub struct Tpu { impl Tpu { pub fn new( - keypair: Keypair, + keypair: &Arc, bank: &Arc, crdt: &Arc>, tick_duration: Option, - transactions_socket: UdpSocket, + transactions_socket: Arc, blob_recycler: &BlobRecycler, exit: Arc, ledger_path: &str, @@ -65,7 +65,7 @@ impl Tpu { let packet_recycler = PacketRecycler::default(); let (fetch_stage, packet_receiver) = - FetchStage::new(Arc::new(transactions_socket), exit, &packet_recycler); + FetchStage::new(transactions_socket, exit, &packet_recycler); let (sigverify_stage, verified_receiver) = SigVerifyStage::new(packet_receiver, sigverify_disabled); @@ -81,7 +81,7 @@ impl Tpu { }; let (write_stage, blob_receiver) = WriteStage::new( - keypair, + keypair.clone(), bank.clone(), crdt.clone(), blob_recycler.clone(), diff --git a/src/tvu.rs b/src/tvu.rs index a84648f6996afd..2c5200bfbdbec9 100644 --- a/src/tvu.rs +++ b/src/tvu.rs @@ -70,21 +70,20 @@ impl Tvu { /// * `exit` - The exit signal. #[cfg_attr(feature = "cargo-clippy", allow(too_many_arguments))] pub fn new( - keypair: Keypair, + keypair: &Arc, bank: &Arc, entry_height: u64, crdt: Arc>, window: SharedWindow, - replicate_socket: UdpSocket, - repair_socket: UdpSocket, - retransmit_socket: UdpSocket, + replicate_socket: Arc, + repair_socket: Arc, + retransmit_socket: Arc, ledger_path: Option<&str>, exit: Arc, ) -> Self { - let repair_socket = Arc::new(repair_socket); let blob_recycler = BlobRecycler::default(); let (fetch_stage, blob_fetch_receiver) = BlobFetchStage::new_multi_socket( - vec![Arc::new(replicate_socket), repair_socket.clone()], + vec![replicate_socket, repair_socket.clone()], exit.clone(), &blob_recycler, ); @@ -95,7 +94,7 @@ impl Tvu { &crdt, window, entry_height, - Arc::new(retransmit_socket), + retransmit_socket, repair_socket, &blob_recycler, blob_fetch_receiver, @@ -166,7 +165,7 @@ pub mod tests { fn new_ncp( crdt: Arc>, - gossip: UdpSocket, + gossip: Arc, exit: Arc, ) -> (Ncp, SharedWindow) { let window = window::default_window(); @@ -179,7 +178,7 @@ pub mod tests { fn test_replicate() { logger::setup(); let leader = Node::new_localhost(); - let target1_keypair = Keypair::new(); + let target1_keypair = Arc::new(Keypair::new()); let target1 = Node::new_localhost_with_pubkey(target1_keypair.pubkey()); let target2 = Node::new_localhost(); let exit = Arc::new(AtomicBool::new(false)); @@ -206,7 +205,7 @@ pub mod tests { let resp_recycler = BlobRecycler::default(); let (s_reader, r_reader) = channel(); let t_receiver = streamer::blob_receiver( - Arc::new(target2.sockets.replicate), + target2.sockets.replicate, exit.clone(), recv_recycler.clone(), s_reader, @@ -216,7 +215,7 @@ pub mod tests { let (s_responder, r_responder) = channel(); let t_responder = streamer::responder( "test_replicate", - Arc::new(leader.sockets.requests), + leader.sockets.requests, resp_recycler.clone(), r_responder, ); @@ -234,7 +233,7 @@ pub mod tests { let dr_1 = new_ncp(cref1.clone(), target1.sockets.gossip, exit.clone()); let tvu = Tvu::new( - target1_keypair, + &target1_keypair, &bank, 0, cref1, diff --git a/src/window.rs b/src/window.rs index 089ac4a4c19ec3..eb0aa0b9403d86 100644 --- a/src/window.rs +++ b/src/window.rs @@ -82,7 +82,7 @@ fn calculate_highest_lost_blob_index(num_peers: u64, consumed: u64, received: u6 let highest_lost = cmp::max(consumed, received.saturating_sub(num_peers)); // This check prevents repairing a blob that will cause window to roll over. Even if - // the highes_lost blob is actually missing, asking to repair it might cause our + // the highest_lost blob is actually missing, asking to repair it might cause our // current window to move past other missing blobs cmp::min(consumed + WINDOW_SIZE - 1, highest_lost) } @@ -789,7 +789,7 @@ mod test { let resp_recycler = BlobRecycler::default(); let (s_reader, r_reader) = channel(); let t_receiver = blob_receiver( - Arc::new(tn.sockets.gossip), + tn.sockets.gossip, exit.clone(), resp_recycler.clone(), s_reader, @@ -805,13 +805,13 @@ mod test { r_reader, s_window, s_retransmit, - Arc::new(tn.sockets.repair), + tn.sockets.repair, ); let t_responder = { let (s_responder, r_responder) = channel(); let t_responder = responder( "window_send_test", - Arc::new(tn.sockets.replicate), + tn.sockets.replicate, resp_recycler.clone(), r_responder, ); @@ -859,7 +859,7 @@ mod test { let resp_recycler = BlobRecycler::default(); let (s_reader, r_reader) = channel(); let t_receiver = blob_receiver( - Arc::new(tn.sockets.gossip), + tn.sockets.gossip, exit.clone(), resp_recycler.clone(), s_reader, @@ -875,13 +875,13 @@ mod test { r_reader, s_window, s_retransmit, - Arc::new(tn.sockets.repair), + tn.sockets.repair, ); let t_responder = { let (s_responder, r_responder) = channel(); let t_responder = responder( "window_send_test", - Arc::new(tn.sockets.replicate), + tn.sockets.replicate, resp_recycler.clone(), r_responder, ); @@ -922,7 +922,7 @@ mod test { let resp_recycler = BlobRecycler::default(); let (s_reader, r_reader) = channel(); let t_receiver = blob_receiver( - Arc::new(tn.sockets.gossip), + tn.sockets.gossip, exit.clone(), resp_recycler.clone(), s_reader, @@ -938,13 +938,13 @@ mod test { r_reader, s_window, s_retransmit, - Arc::new(tn.sockets.repair), + tn.sockets.repair, ); let t_responder = { let (s_responder, r_responder) = channel(); let t_responder = responder( "window_send_test", - Arc::new(tn.sockets.replicate), + tn.sockets.replicate, resp_recycler.clone(), r_responder, ); diff --git a/src/write_stage.rs b/src/write_stage.rs index 8041e0414def63..681051bf0a7ba4 100644 --- a/src/write_stage.rs +++ b/src/write_stage.rs @@ -67,7 +67,7 @@ impl WriteStage { /// Create a new WriteStage for writing and broadcasting entries. pub fn new( - keypair: Keypair, + keypair: Arc, bank: Arc, crdt: Arc>, blob_recycler: BlobRecycler, diff --git a/tests/data_replicator.rs b/tests/data_replicator.rs index 408ce08bc64aa5..ef8ff9cdfff767 100644 --- a/tests/data_replicator.rs +++ b/tests/data_replicator.rs @@ -21,8 +21,9 @@ fn test_node(exit: Arc) -> (Arc>, Ncp, UdpSocket) { let crdt = Crdt::new(tn.info.clone()).expect("Crdt::new"); let c = Arc::new(RwLock::new(crdt)); let w = Arc::new(RwLock::new(vec![])); - let d = Ncp::new(&c.clone(), w, None, tn.sockets.gossip, exit); - (c, d, tn.sockets.replicate) + let d = Ncp::new(&c.clone(), w, None, tn.sockets.gossip, exit).unwrap(); + let replicate = Arc::try_unwrap(tn.sockets.replicate).unwrap(); + (c, d, replicate) } /// Test that the network converges.