Skip to content

Commit

Permalink
Partitioned validator and leader services, allow dynamic transition f…
Browse files Browse the repository at this point in the history
…rom validator <-> leader by shutting down one set of services and starting up the other
  • Loading branch information
carllin committed Sep 6, 2018
1 parent 6560b0e commit 219a33e
Show file tree
Hide file tree
Showing 15 changed files with 438 additions and 175 deletions.
2 changes: 1 addition & 1 deletion src/bin/bench-tps.rs
Original file line number Diff line number Diff line change
Expand Up @@ -686,7 +686,7 @@ fn converge(
&spy_ref,
window.clone(),
None,
gossip_socket,
Arc::new(gossip_socket),
exit_signal.clone(),
);
let mut v: Vec<NodeInfo> = vec![];
Expand Down
3 changes: 1 addition & 2 deletions src/broadcast_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,13 +168,12 @@ impl BroadcastStage {
/// See `crdt` for network layer definitions.
/// # Arguments
/// * `sock` - Socket to send from.
/// * `exit` - Boolean to signal system exit.
/// * `crdt` - CRDT structure
/// * `window` - Cache of blobs that we have broadcast
/// * `recycler` - Blob recycler.
/// * `receiver` - Receive channel for blobs to be retransmitted to all the layer 1 nodes.
pub fn new(
sock: UdpSocket,
sock: Arc<UdpSocket>,
crdt: Arc<RwLock<Crdt>>,
window: SharedWindow,
entry_height: u64,
Expand Down
48 changes: 24 additions & 24 deletions src/crdt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1261,14 +1261,14 @@ impl Crdt {
}

pub struct Sockets {
pub gossip: UdpSocket,
pub requests: UdpSocket,
pub replicate: UdpSocket,
pub transaction: UdpSocket,
pub respond: UdpSocket,
pub broadcast: UdpSocket,
pub repair: UdpSocket,
pub retransmit: UdpSocket,
pub gossip: Arc<UdpSocket>,
pub requests: Arc<UdpSocket>,
pub replicate: Arc<UdpSocket>,
pub transaction: Arc<UdpSocket>,
pub respond: Arc<UdpSocket>,
pub broadcast: Arc<UdpSocket>,
pub repair: Arc<UdpSocket>,
pub retransmit: Arc<UdpSocket>,
}

pub struct Node {
Expand Down Expand Up @@ -1301,14 +1301,14 @@ impl Node {
Node {
info,
sockets: Sockets {
gossip,
requests,
replicate,
transaction,
respond,
broadcast,
repair,
retransmit,
gossip: Arc::new(gossip),
requests: Arc::new(requests),
replicate: Arc::new(replicate),
transaction: Arc::new(transaction),
respond: Arc::new(respond),
broadcast: Arc::new(broadcast),
repair: Arc::new(repair),
retransmit: Arc::new(retransmit),
},
}
}
Expand Down Expand Up @@ -1362,14 +1362,14 @@ impl Node {
Node {
info,
sockets: Sockets {
gossip,
requests,
replicate,
transaction,
respond,
broadcast,
repair,
retransmit,
gossip: Arc::new(gossip),
requests: Arc::new(requests),
replicate: Arc::new(replicate),
transaction: Arc::new(transaction),
respond: Arc::new(respond),
broadcast: Arc::new(broadcast),
repair: Arc::new(repair),
retransmit: Arc::new(retransmit),
},
}
}
Expand Down
117 changes: 60 additions & 57 deletions src/drone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -286,66 +286,69 @@ mod tests {
let leader_data = leader.info.clone();
let ledger_path = tmp_ledger_path("send_airdrop");

let server = Fullnode::new_with_bank(
leader_keypair,
bank,
0,
&[],
leader,
None,
exit.clone(),
Some(&ledger_path),
false,
);
//TODO: this seems unstable
sleep(Duration::from_millis(900));

let mut addr: SocketAddr = "0.0.0.0:9900".parse().expect("bind to drone socket");
addr.set_ip(get_ip_addr().expect("drone get_ip_addr"));
let mut drone = Drone::new(
alice.keypair(),
addr,
leader_data.contact_info.tpu,
leader_data.contact_info.rpu,
None,
Some(150_000),
);

let requests_socket = UdpSocket::bind("0.0.0.0:0").expect("drone bind to requests socket");
let transactions_socket =
UdpSocket::bind("0.0.0.0:0").expect("drone bind to transactions socket");

let mut client = ThinClient::new(
leader_data.contact_info.rpu,
requests_socket,
leader_data.contact_info.tpu,
transactions_socket,
);

let bob_req = DroneRequest::GetAirdrop {
airdrop_request_amount: 50,
client_pubkey: bob_pubkey,
};
let bob_sig = drone.send_airdrop(bob_req).unwrap();
assert!(client.poll_for_signature(&bob_sig).is_ok());

let carlos_req = DroneRequest::GetAirdrop {
airdrop_request_amount: 5_000_000,
client_pubkey: carlos_pubkey,
};
let carlos_sig = drone.send_airdrop(carlos_req).unwrap();
assert!(client.poll_for_signature(&carlos_sig).is_ok());
{
let server = Fullnode::new_with_bank(
leader_keypair,
bank,
0,
&[],
leader,
None,
exit.clone(),
Some(&ledger_path),
false,
);
//TODO: this seems unstable
sleep(Duration::from_millis(900));

let mut addr: SocketAddr = "0.0.0.0:9900".parse().expect("bind to drone socket");
addr.set_ip(get_ip_addr().expect("drone get_ip_addr"));
let mut drone = Drone::new(
alice.keypair(),
addr,
leader_data.contact_info.tpu,
leader_data.contact_info.rpu,
None,
Some(150_000),
);

let bob_balance = client.get_balance(&bob_pubkey);
info!("Small request balance: {:?}", bob_balance);
assert_eq!(bob_balance.unwrap(), SMALL_BATCH);
let requests_socket =
UdpSocket::bind("0.0.0.0:0").expect("drone bind to requests socket");
let transactions_socket =
UdpSocket::bind("0.0.0.0:0").expect("drone bind to transactions socket");

let carlos_balance = client.get_balance(&carlos_pubkey);
info!("TPS request balance: {:?}", carlos_balance);
assert_eq!(carlos_balance.unwrap(), TPS_BATCH);
let mut client = ThinClient::new(
leader_data.contact_info.rpu,
requests_socket,
leader_data.contact_info.tpu,
transactions_socket,
);

exit.store(true, Ordering::Relaxed);
server.join().unwrap();
let bob_req = DroneRequest::GetAirdrop {
airdrop_request_amount: 50,
client_pubkey: bob_pubkey,
};
let bob_sig = drone.send_airdrop(bob_req).unwrap();
assert!(client.poll_for_signature(&bob_sig).is_ok());

let carlos_req = DroneRequest::GetAirdrop {
airdrop_request_amount: 5_000_000,
client_pubkey: carlos_pubkey,
};
let carlos_sig = drone.send_airdrop(carlos_req).unwrap();
assert!(client.poll_for_signature(&carlos_sig).is_ok());

let bob_balance = client.get_balance(&bob_pubkey);
info!("Small request balance: {:?}", bob_balance);
assert_eq!(bob_balance.unwrap(), SMALL_BATCH);

let carlos_balance = client.get_balance(&carlos_pubkey);
info!("TPS request balance: {:?}", carlos_balance);
assert_eq!(carlos_balance.unwrap(), TPS_BATCH);

exit.store(true, Ordering::Relaxed);
server.join().unwrap();
}
remove_dir_all(ledger_path).unwrap();
}
}
Loading

0 comments on commit 219a33e

Please sign in to comment.