Skip to content

Commit

Permalink
wait for network to converge
Browse files Browse the repository at this point in the history
  • Loading branch information
aeyakovenko committed May 6, 2018
1 parent 920a60f commit 7d10094
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 4 deletions.
2 changes: 1 addition & 1 deletion src/accountant_skel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -430,7 +430,7 @@ impl AccountantSkel {
) -> Result<()> {
let timer = Duration::new(1, 0);
let blobs = verified_receiver.recv_timeout(timer)?;
info!("replicating blobs {}", blobs.len());
trace!("replicating blobs {}", blobs.len());
for msgs in &blobs {
let blob = msgs.read().unwrap();
let entries: Vec<Entry> = deserialize(&blob.data()[..blob.meta.size]).unwrap();
Expand Down
44 changes: 42 additions & 2 deletions src/accountant_stub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,15 +145,15 @@ mod tests {
use super::*;
use accountant::Accountant;
use accountant_skel::AccountantSkel;
use crdt::ReplicatedData;
use crdt::{ReplicatedData, Crdt};
use futures::Future;
use historian::Historian;
use mint::Mint;
use signature::{KeyPair, KeyPairUtil};
use std::io::sink;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::sync_channel;
use std::sync::Arc;
use std::sync::{RwLock, Arc};
use std::thread::sleep;
use std::time::Duration;
use logger;
Expand Down Expand Up @@ -250,6 +250,43 @@ mod tests {
leader.0.clone(),
exit.clone(),
).unwrap();

//lets spy on the network
let (mut spy,spy_gossip,_,_) = test_node();
let daddr = "0.0.0.0:0".parse().unwrap();
spy.replicate_addr = daddr;
spy.serve_addr = daddr;
let mut spy_crdt = Crdt::new(spy);
spy_crdt.insert(leader.0.clone());
spy_crdt.set_leader(leader.0.id);

let spy_ref = Arc::new(RwLock::new(spy_crdt));
let t_spy_listen = Crdt::listen(spy_ref.clone(), spy_gossip, exit.clone());
let t_spy_gossip = Crdt::gossip(spy_ref.clone(), exit.clone());
//wait for the network to converge
for _ in 0 .. 20 {
let ix = spy_ref.read().unwrap().update_index;
info!("my update index is {}", ix);
let len = spy_ref.read().unwrap().remote.values().len();
let mut done = false;
info!("remote len {}", len);
if len > 1 {
//check if everyones remote index is greater or equal to ours
for t in spy_ref.read().unwrap().remote.values() {
info!("remote update index is {}", *t);
if ix > 2 && *t > 2 && *t >= ix {
done = true;
break;
}
}
}
if done == true {
info!("converged!");
break;
}
sleep(Duration::new(1, 0));
}

//verify leader can do transfer
let leader_balance = {
let socket = UdpSocket::bind("0.0.0.0:0").unwrap();
Expand Down Expand Up @@ -291,5 +328,8 @@ mod tests {
for t in replicant_threads {
t.join().unwrap();
}
for t in vec![t_spy_listen, t_spy_gossip] {
t.join().unwrap();
}
}
}
2 changes: 1 addition & 1 deletion src/crdt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ pub struct Crdt {
local: HashMap<PublicKey, u64>,
/// The value of the remote update index that i have last seen
/// This Node will ask external nodes for updates since the value in this list
remote: HashMap<PublicKey, u64>,
pub remote: HashMap<PublicKey, u64>,
pub update_index: u64,
me: PublicKey,
timeout: Duration,
Expand Down

0 comments on commit 7d10094

Please sign in to comment.