diff --git a/src/crdt.rs b/src/crdt.rs index e2303fbbe49acd..7029e79b21be9e 100644 --- a/src/crdt.rs +++ b/src/crdt.rs @@ -21,6 +21,7 @@ use rayon::prelude::*; use result::{Error, Result}; use ring::rand::{SecureRandom, SystemRandom}; use signature::{PublicKey, Signature}; +use std; use std::collections::HashMap; use std::io::Cursor; use std::net::{SocketAddr, UdpSocket}; @@ -194,7 +195,6 @@ impl Crdt { if nodes.len() < 1 { return Err(Error::CrdtTooSmall); } - info!("nodes table {}", nodes.len()); info!("blobs table {}", blobs.len()); // enumerate all the blobs, those are the indices @@ -295,6 +295,12 @@ impl Crdt { Ok(()) } + // max number of nodes that we could be converged to + pub fn convergence(&self) -> u64 { + let max = self.remote.values().len() as u64 + 1; + self.remote.values().fold(max, |a, b| std::cmp::min(a, *b)) + } + fn random() -> u64 { let rnd = SystemRandom::new(); let mut buf = [0u8; 8]; @@ -552,21 +558,16 @@ mod test { .map(|&(ref c, _)| Crdt::gossip(c.clone(), exit.clone())) .collect(); let mut done = true; - for _ in 0..(num * 32) { - done = true; + for i in 0..(num * 32) { + done = false; + trace!("round {}", i); for &(ref c, _) in listen.iter() { - trace!( - "done updates {} {}", - c.read().unwrap().table.len(), - c.read().unwrap().update_index - ); - //make sure the number of updates doesn't grow unbounded - assert!(c.read().unwrap().update_index <= num as u64); - //make sure we got all the updates - if c.read().unwrap().table.len() != num { - done = false; + if num == c.read().unwrap().convergence() as usize { + done = true; + break; } } + //at least 1 node converged if done == true { break; } @@ -590,6 +591,7 @@ mod test { #[test] #[ignore] fn gossip_ring_test() { + logger::setup(); run_gossip_topo(|listen| { let num = listen.len(); for n in 0..num { diff --git a/src/thin_client.rs b/src/thin_client.rs index 1979d42ff2adad..bdf43d319999ad 100644 --- a/src/thin_client.rs +++ b/src/thin_client.rs @@ -365,21 +365,16 @@ mod tests { let t_spy_listen = Crdt::listen(spy_ref.clone(), spy_window, spy_gossip, exit.clone()); let t_spy_gossip = Crdt::gossip(spy_ref.clone(), exit.clone()); //wait for the network to converge + let mut converged = false; for _ in 0..30 { - let len = spy_ref.read().unwrap().table.values().len(); - let mut min = num_nodes as u64; - for u in spy_ref.read().unwrap().remote.values() { - if min > *u { - min = *u; - } - } - info!("length {} {}", len, min); - if num_nodes == len && min >= (num_nodes as u64) { - warn!("converged! {} {}", len, min); + let num = spy_ref.read().unwrap().convergence(); + if num == num_nodes as u64 { + converged = true; break; } sleep(Duration::new(1, 0)); } + assert!(converged); threads.push(t_spy_listen); threads.push(t_spy_gossip); let v: Vec = spy_ref