Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

address_store: Improve address tracking and add eviction algorithm #250

Merged
merged 31 commits into from
Oct 25, 2024
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
59dbbf8
address: Add score constants to the address store
lexnv Sep 24, 2024
ae7799a
address: Convert DialError to appropriate score
lexnv Sep 24, 2024
ac5d731
address: Implement eviction algorithm for the AddressStore
lexnv Sep 24, 2024
a2356ec
manager: Keep track of the address for future dials
lexnv Sep 24, 2024
5355e85
manager: Make add_known_address more robust to track multiple peer addrs
lexnv Sep 24, 2024
ecb436b
manager: Track addresses better for dial and open failure
lexnv Sep 24, 2024
bf066d1
manager: Update addresses on connection established
lexnv Sep 24, 2024
b052c5d
manager/handle: Keep lock for shorter time
lexnv Sep 24, 2024
da52056
address/tests: Adjust testing
lexnv Sep 24, 2024
6c7f399
address/tests: Add tests for eviction and insertion
lexnv Sep 24, 2024
5b88ea4
manager/tests: Adjust testing to new address store interface
lexnv Sep 24, 2024
c97ffaa
address/tests: Use >= for random ranges of scores
lexnv Sep 24, 2024
126f93d
manager: Evit addresses only on adress capacity reached
lexnv Oct 24, 2024
3583ea3
manager: Fix dialError timeout score
lexnv Oct 24, 2024
7cddf2e
manager: Simplify error to score method
lexnv Oct 24, 2024
0e707b6
manager: Warn about different PeerID address tracking
lexnv Oct 24, 2024
ba3757a
manager: Refactor continue with if else
lexnv Oct 24, 2024
db095f4
manager: Replace trace with warning
lexnv Oct 24, 2024
815d543
manager: Adjust unfinished comment
lexnv Oct 24, 2024
d98b9b6
manager: Remove PeerIdMissmatch logic
lexnv Oct 24, 2024
42ce6cc
manager: Evict lower score addresses
lexnv Oct 24, 2024
97fd413
manager/tests: Adjust testing
lexnv Oct 24, 2024
4ec27d2
Merge remote-tracking branch 'origin/master' into lexnv/enhanve-addre…
lexnv Oct 24, 2024
a6af7b3
Merge branch 'master' into lexnv/enhanve-address-store
lexnv Oct 24, 2024
3d52019
Update src/transport/manager/address.rs
lexnv Oct 25, 2024
2bd8621
manager: Only track the addresses corresponding to the provided peer
lexnv Oct 25, 2024
8be6a1e
address: Replace num_addr with addresses.len() for clarity
lexnv Oct 25, 2024
23c33dc
Merge remote-tracking branch 'origin/lexnv/enhanve-address-store' int…
lexnv Oct 25, 2024
c779ded
Update src/transport/manager/handle.rs
lexnv Oct 25, 2024
f2c5e8f
manager: Rewrite with expect
lexnv Oct 25, 2024
f124b79
manager: Avoid downcasting multiaddr
lexnv Oct 25, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
244 changes: 187 additions & 57 deletions src/transport/manager/address.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,37 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use crate::{types::ConnectionId, PeerId};
use crate::{
error::{DialError, NegotiationError},
types::ConnectionId,
PeerId,
};

use multiaddr::{Multiaddr, Protocol};
use multihash::Multihash;

use std::collections::{BinaryHeap, HashSet};
use std::collections::{hash_map::Entry, HashMap};

/// Maximum number of addresses tracked for a peer.
const MAX_ADDRESSES: usize = 64;

/// Scores for address records.
pub mod scores {
/// Score indicating that the connection was successfully established.
pub const CONNECTION_ESTABLISHED: i32 = 100i32;

/// Score for a connection with a peer using a different ID than expected.
pub const DIFFERENT_PEER_ID: i32 = 50i32;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Realistically we can't establish a connection to this peer, so I would make the score the same as CONNECTION_FAILURE.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep makes sense! Have simplified the code kept around 1 connection failure and 1 address failure which punishes more, thanks for the suggestion! 🙏


/// Score for failing to connect due to an invalid or unreachable address.
pub const CONNECTION_FAILURE: i32 = -100i32;

/// Score for a connection attempt that failed due to a timeout.
pub const TIMEOUT_FAILURE: i32 = -50i32;
}

/// Remove the address from the store if the score is below this threshold.
const REMOVE_THRESHOLD: i32 = scores::CONNECTION_FAILURE * 2;
lexnv marked this conversation as resolved.
Show resolved Hide resolved

#[allow(clippy::derived_hash_with_manual_eq)]
#[derive(Debug, Clone, Hash)]
Expand Down Expand Up @@ -134,11 +159,10 @@ impl Ord for AddressRecord {
/// Store for peer addresses.
#[derive(Debug)]
pub struct AddressStore {
//// Addresses sorted by score.
pub by_score: BinaryHeap<AddressRecord>,

/// Addresses queryable by hashing them for faster lookup.
pub by_address: HashSet<Multiaddr>,
/// Addresses available.
pub addresses: HashMap<Multiaddr, AddressRecord>,
/// Maximum capacity of the address store.
max_capacity: usize,
}

impl FromIterator<Multiaddr> for AddressStore {
Expand All @@ -158,8 +182,7 @@ impl FromIterator<AddressRecord> for AddressStore {
fn from_iter<T: IntoIterator<Item = AddressRecord>>(iter: T) -> Self {
let mut store = AddressStore::new();
for record in iter {
store.by_address.insert(record.address.clone());
store.by_score.push(record);
store.insert(record);
}

store
Expand All @@ -186,52 +209,84 @@ impl AddressStore {
/// Create new [`AddressStore`].
pub fn new() -> Self {
Self {
by_score: BinaryHeap::new(),
by_address: HashSet::new(),
addresses: HashMap::with_capacity(MAX_ADDRESSES),
max_capacity: MAX_ADDRESSES,
}
}

/// Check if [`AddressStore`] is empty.
pub fn is_empty(&self) -> bool {
self.by_score.is_empty()
/// Get the score for a given error.
pub fn error_score(error: &DialError) -> i32 {
match error {
DialError::Timeout => scores::CONNECTION_ESTABLISHED,
lexnv marked this conversation as resolved.
Show resolved Hide resolved
DialError::AddressError(_) => scores::CONNECTION_FAILURE,
lexnv marked this conversation as resolved.
Show resolved Hide resolved
DialError::DnsError(_) => scores::CONNECTION_FAILURE,
lexnv marked this conversation as resolved.
Show resolved Hide resolved
DialError::NegotiationError(negotiation_error) => match negotiation_error {
NegotiationError::PeerIdMismatch(_, _) => scores::DIFFERENT_PEER_ID,
// Timeout during the negotiation phase.
NegotiationError::Timeout => scores::TIMEOUT_FAILURE,
// Treat other errors as connection failures.
_ => scores::CONNECTION_FAILURE,
},
}
}

/// Check if address is already in the a
pub fn contains(&self, address: &Multiaddr) -> bool {
self.by_address.contains(address)
/// Check if [`AddressStore`] is empty.
pub fn is_empty(&self) -> bool {
self.addresses.is_empty()
}

/// Insert new address record into [`AddressStore`] with default address score.
pub fn insert(&mut self, mut record: AddressRecord) {
if self.by_address.contains(record.address()) {
/// Insert the address record into [`AddressStore`] with the provided score.
///
/// If the address is not in the store, it will be inserted.
/// Otherwise, the score and connection ID will be updated.
pub fn insert(&mut self, record: AddressRecord) {
let num_addresses = self.addresses.len();

if let Entry::Occupied(mut occupied) = self.addresses.entry(record.address.clone()) {
occupied.get_mut().update_score(record.score);
if occupied.get().score <= REMOVE_THRESHOLD {
occupied.remove();
}
return;
}

record.connection_id = None;
self.by_address.insert(record.address.clone());
self.by_score.push(record);
}

/// Pop address with the highest score from [`AddressStore`].
pub fn pop(&mut self) -> Option<AddressRecord> {
self.by_score.pop().map(|record| {
self.by_address.remove(&record.address);
record
})
}

/// Take at most `limit` `AddressRecord`s from [`AddressStore`].
pub fn take(&mut self, limit: usize) -> Vec<AddressRecord> {
let mut records = Vec::new();
// The eviction algorithm favours addresses with higher scores.
//
// This algorithm has the following implications:
// - it keeps the best addresses in the store.
// - if the store is at capacity, the worst address will be evicted.
// - an address that is not dialed yet (with score zero) will be preferred over an address
// that already failed (with negative score).
if num_addresses >= self.max_capacity {
// No need to keep track of negative addresses if we are at capacity.
if record.score < 0 {
return;
}
lexnv marked this conversation as resolved.
Show resolved Hide resolved

for _ in 0..limit {
match self.pop() {
Some(record) => records.push(record),
None => break,
let Some(min_record) = self.addresses.values().min().cloned() else {
return;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Won't we lose the address due to returning if the values() are empty?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The num_addresses is initialized to self.addresses.len() 🤔

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have removed the num_addreses variable

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a chance of getting None here? May be expect() with a comment then?

};
// The lowest score is better than the new record.
if record.score < min_record.score {
return;
}
self.addresses.remove(min_record.address());
}

records
// There's no need to keep track of this address if the score is below the threshold.
if record.score <= REMOVE_THRESHOLD {
return;
}

// Insert the record.
self.addresses.insert(record.address.clone(), record);
}

/// Return the available addresses sorted by score.
pub fn addresses(&self, limit: usize) -> Vec<Multiaddr> {
let mut records = self.addresses.values().cloned().collect::<Vec<_>>();
records.sort_by(|lhs, rhs| rhs.score.cmp(&lhs.score));
records.into_iter().take(limit).map(|record| record.address).collect()
Comment on lines +259 to +262
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I actually liked in the original implementation, is that popping and putting elements in a binary heap is super fast. Unfortunately, that required taking the address out before the dial attempt and putting it back after.

Ideally, we would use something like BiMap over addresses and scores. As scores can be identical, more strictly we need BiMultiBTreeHashMap, which is too peculiar to have a decent existing implementation.

Mostly thinking out loud. We can keep this for a follow-up.

}
}

Expand All @@ -256,7 +311,7 @@ mod tests {
),
rng.gen_range(1..=65535),
));
let score: i32 = rng.gen();
let score: i32 = rng.gen_range(10..=200);

AddressRecord::new(
&peer,
Expand All @@ -279,7 +334,7 @@ mod tests {
),
rng.gen_range(1..=65535),
));
let score: i32 = rng.gen();
let score: i32 = rng.gen_range(10..=200);

AddressRecord::new(
&peer,
Expand All @@ -303,7 +358,7 @@ mod tests {
),
rng.gen_range(1..=65535),
));
let score: i32 = rng.gen();
let score: i32 = rng.gen_range(10..=200);

AddressRecord::new(
&peer,
Expand Down Expand Up @@ -331,19 +386,22 @@ mod tests {
store.insert(quic_address_record(&mut rng));
}

let known_addresses = store.by_address.len();
let known_addresses = store.addresses.len();
assert!(known_addresses >= 3);

let taken = store.take(known_addresses - 2);
let taken = store.addresses(known_addresses - 2);
assert_eq!(known_addresses - 2, taken.len());
assert!(!store.is_empty());

let mut prev: Option<AddressRecord> = None;
for record in taken {
assert!(!store.contains(record.address()));
for address in taken {
// Addresses are still in the store.
assert!(store.addresses.contains_key(&address));

let record = store.addresses.get(&address).unwrap().clone();

if let Some(previous) = prev {
assert!(previous.score > record.score);
assert!(previous.score >= record.score);
}

prev = Some(record);
Expand All @@ -359,18 +417,19 @@ mod tests {
store.insert(ws_address_record(&mut rng));
store.insert(quic_address_record(&mut rng));

assert_eq!(store.by_address.len(), 3);
assert_eq!(store.addresses.len(), 3);

let taken = store.take(8usize);
let taken = store.addresses(8usize);
assert_eq!(taken.len(), 3);
assert!(store.is_empty());

let mut prev: Option<AddressRecord> = None;
for record in taken {
let record = store.addresses.get(&record).unwrap().clone();

if prev.is_none() {
prev = Some(record);
} else {
assert!(prev.unwrap().score > record.score);
assert!(prev.unwrap().score >= record.score);
prev = Some(record);
}
}
Expand Down Expand Up @@ -401,10 +460,9 @@ mod tests {
.collect::<HashMap<_, _>>();
store.extend(records);

for record in store.by_score {
for record in store.addresses.values().cloned() {
let stored = cloned.get(record.address()).unwrap();
assert_eq!(stored.score(), record.score());
assert_eq!(stored.connection_id(), record.connection_id());
assert_eq!(stored.address(), record.address());
}
}
Expand Down Expand Up @@ -433,11 +491,83 @@ mod tests {
let cloned = records.iter().cloned().collect::<HashMap<_, _>>();
store.extend(records.iter().map(|(_, record)| record));

for record in store.by_score {
for record in store.addresses.values().cloned() {
let stored = cloned.get(record.address()).unwrap();
assert_eq!(stored.score(), record.score());
assert_eq!(stored.connection_id(), record.connection_id());
assert_eq!(stored.address(), record.address());
}
}

#[test]
fn insert_record() {
let mut store = AddressStore::new();
let mut rng = rand::thread_rng();

let mut record = tcp_address_record(&mut rng);
record.score = 10;

store.insert(record.clone());

assert_eq!(store.addresses.len(), 1);
assert_eq!(store.addresses.get(record.address()).unwrap(), &record);

// This time the record is updated.
store.insert(record.clone());

assert_eq!(store.addresses.len(), 1);
let store_record = store.addresses.get(record.address()).unwrap();
assert_eq!(store_record.score, record.score * 2);
}

#[test]
fn evict_below_threshold() {
let mut store = AddressStore::new();
let mut rng = rand::thread_rng();

let mut record = tcp_address_record(&mut rng);
record.score = scores::CONNECTION_FAILURE;
store.insert(record.clone());

assert_eq!(store.addresses.len(), 1);

store.insert(record.clone());

assert_eq!(store.addresses.len(), 0);
}

#[test]
fn evict_on_capacity() {
let mut store = AddressStore {
addresses: HashMap::new(),
max_capacity: 2,
};

let mut rng = rand::thread_rng();
let mut first_record = tcp_address_record(&mut rng);
first_record.score = scores::CONNECTION_ESTABLISHED;
let mut second_record = ws_address_record(&mut rng);
second_record.score = 0;

store.insert(first_record.clone());
store.insert(second_record.clone());

assert_eq!(store.addresses.len(), 2);

// We have better addresses, ignore this one.
let mut third_record = quic_address_record(&mut rng);
third_record.score = scores::CONNECTION_FAILURE;
store.insert(third_record.clone());
assert_eq!(store.addresses.len(), 2);
assert!(store.addresses.contains_key(first_record.address()));
assert!(store.addresses.contains_key(second_record.address()));

// Evict the address with the lowest score.
let mut fourth_record = quic_address_record(&mut rng);
fourth_record.score = scores::DIFFERENT_PEER_ID;
store.insert(fourth_record.clone());

assert_eq!(store.addresses.len(), 2);
assert!(store.addresses.contains_key(first_record.address()));
assert!(store.addresses.contains_key(fourth_record.address()));
}
}
Loading