diff --git a/core/src/crds.rs b/core/src/crds.rs index 37c02f794bfa88..e07e9f328e9405 100644 --- a/core/src/crds.rs +++ b/core/src/crds.rs @@ -24,6 +24,7 @@ //! A value is updated to a new version if the labels match, and the value //! wallclock is later, or the value hash is greater. +use crate::crds_gossip_pull::CrdsFilter; use crate::crds_value::{CrdsValue, CrdsValueLabel}; use bincode::serialize; use indexmap::map::IndexMap; @@ -37,6 +38,8 @@ pub struct Crds { /// Stores the map of labels and values pub table: IndexMap, pub num_inserts: usize, + + pub masks: IndexMap, } #[derive(PartialEq, Debug)] @@ -86,6 +89,7 @@ impl Default for Crds { Crds { table: IndexMap::new(), num_inserts: 0, + masks: IndexMap::new(), } } } @@ -126,6 +130,10 @@ impl Crds { .map(|current| new_value > *current) .unwrap_or(true); if do_insert { + self.masks.insert( + label.clone(), + CrdsFilter::hash_as_u64(&new_value.value_hash), + ); let old = self.table.insert(label, new_value); self.num_inserts += 1; Ok(old) @@ -193,6 +201,7 @@ impl Crds { pub fn remove(&mut self, key: &CrdsValueLabel) { self.table.swap_remove(key); + self.masks.swap_remove(key); } } diff --git a/core/src/crds_gossip_pull.rs b/core/src/crds_gossip_pull.rs index 9191568a63ae67..6873a0bcb47d79 100644 --- a/core/src/crds_gossip_pull.rs +++ b/core/src/crds_gossip_pull.rs @@ -91,7 +91,7 @@ impl CrdsFilter { // for small ratios this can result in a negative number, ensure it returns 0 instead ((num_items / max_items).log2().ceil()).max(0.0) as u32 } - fn hash_as_u64(item: &Hash) -> u64 { + pub fn hash_as_u64(item: &Hash) -> u64 { let arr = item.as_ref(); let mut accum = 0; for (i, val) in arr.iter().enumerate().take(8) { @@ -99,6 +99,10 @@ impl CrdsFilter { } accum } + pub fn test_mask_u64(&self, item: u64, ones: u64) -> bool { + let bits = item | ones; + bits == self.mask + } pub fn test_mask(&self, item: &Hash) -> bool { // only consider the highest mask_bits bits from the hash and set the rest to 1. let ones = (!0u64).checked_shr(self.mask_bits).unwrap_or(!0u64); @@ -111,9 +115,6 @@ impl CrdsFilter { } } pub fn contains(&self, item: &Hash) -> bool { - if !self.test_mask(item) { - return true; - } self.filter.contains(item) } } @@ -395,16 +396,25 @@ impl CrdsGossipPull { return ret; } let mut total_skipped = 0; - for v in crds.table.values() { - recent.iter().enumerate().for_each(|(i, (caller, filter))| { - //skip values that are too new - if v.value.wallclock() > caller.wallclock().checked_add(jitter).unwrap_or_else(|| 0) - { - total_skipped += 1; - return; - } - if !filter.contains(&v.value_hash) { - ret[i].push(v.value.clone()); + let mask_ones: Vec<_> = recent.iter().map(|(_caller, filter)| { + (!0u64).checked_shr(filter.mask_bits).unwrap_or(!0u64) + }).collect(); + for (label, mask) in crds.masks.iter() { + recent.iter().zip(mask_ones.iter()).enumerate().for_each(|(i, ((caller, filter), mask_ones))| { + if filter.test_mask_u64(*mask, *mask_ones) { + let item = crds.table.get(label).unwrap(); + + //skip values that are too new + if item.value.wallclock() + > caller.wallclock().checked_add(jitter).unwrap_or_else(|| 0) + { + total_skipped += 1; + return; + } + + if !filter.contains(&item.value_hash) { + ret[i].push(item.value.clone()); + } } }); }