Skip to content

Commit

Permalink
feat: try evict inbound peers when inbound slots is full
Browse files Browse the repository at this point in the history
  • Loading branch information
jjyr committed Dec 18, 2018
1 parent e1e5750 commit d0db77e
Show file tree
Hide file tree
Showing 8 changed files with 254 additions and 180 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion network/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ authors = ["Nervos Core Dev <[email protected]>"]
edition = "2018"

[dependencies]
rand = "0.5"
rand = "0.6"
fnv = "1.0"
serde = "1.0"
serde_derive = "1.0"
Expand Down
29 changes: 18 additions & 11 deletions network/src/ckb_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::Network;
use crate::PeerId;
use futures::future::{self, Future};
use futures::Stream;
use libp2p::core::{Multiaddr, UniqueConnecState};
use libp2p::core::{Endpoint, Multiaddr, UniqueConnecState};
use libp2p::kad;
use log::{error, info};
use std::boxed::Box;
Expand All @@ -36,16 +36,23 @@ impl CKBService {
let protocol_version = protocol_output.protocol_version;
let endpoint = protocol_output.endpoint;
// get peer protocol_connection
let protocol_connec =
match network.ckb_protocol_connec(&peer_id, protocol_id, endpoint, addr) {
Ok(protocol_connec) => protocol_connec,
Err(err) => {
return Box::new(future::err(IoError::new(
IoErrorKind::Other,
format!("handle ckb_protocol connection error: {}", err),
))) as Box<Future<Item = (), Error = IoError> + Send>
let protocol_connec = {
let result = match endpoint {
Endpoint::Dialer => {
network.try_outbound_ckb_protocol_connec(&peer_id, protocol_id, addr)
}
Endpoint::Listener => {
network.try_inbound_ckb_protocol_connec(&peer_id, protocol_id, addr)
}
};
if let Err(err) = result {
return Box::new(future::err(IoError::new(
IoErrorKind::Other,
format!("handle ckb_protocol connection error: {}", err),
))) as Box<Future<Item = (), Error = IoError> + Send>;
}
result.unwrap()
};
if protocol_connec.state() == UniqueConnecState::Full {
error!(
target: "network",
Expand Down Expand Up @@ -107,7 +114,7 @@ impl CKBService {
{
let mut peer_store = network.peer_store().write();
peer_store.report(&peer_id, Behaviour::UnexpectedDisconnect);
peer_store.report_status(&peer_id, Status::Disconnected);
peer_store.update_status(&peer_id, Status::Disconnected);
}
protocol_handler.disconnected(
Box::new(DefaultCKBProtocolContext::new(
Expand All @@ -130,7 +137,7 @@ impl CKBService {
{
let mut peer_store = network.peer_store().write();
peer_store.report(&peer_id, Behaviour::Connect);
peer_store.report_status(&peer_id, Status::Connected);
peer_store.update_status(&peer_id, Status::Connected);
}
{
let handle_connected = future::lazy(move || {
Expand Down
42 changes: 11 additions & 31 deletions network/src/memory_peer_store.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::peer_store::{Behaviour, PeerStore, Status};
use crate::peer_store::{Behaviour, PeerStore, Score, Status};
use crate::PeerId;
use fnv::FnvHashMap;
use libp2p::core::Multiaddr;
Expand All @@ -23,15 +23,13 @@ struct PeerInfo {
pub struct MemoryPeerStore {
bootnodes: Vec<(PeerId, Multiaddr)>,
peers: FnvHashMap<PeerId, PeerInfo>,
reserved_nodes: FnvHashMap<PeerId, Vec<Multiaddr>>,
}

impl MemoryPeerStore {
pub fn new(bootnodes: Vec<(PeerId, Multiaddr)>) -> Self {
let mut peer_store = MemoryPeerStore {
bootnodes: bootnodes.clone(),
peers: Default::default(),
reserved_nodes: Default::default(),
};
for (peer_id, addr) in bootnodes {
peer_store.add_peer(peer_id, vec![addr]);
Expand All @@ -56,6 +54,11 @@ impl MemoryPeerStore {
}

impl PeerStore for MemoryPeerStore {
fn add_discovered_address(&mut self, peer_id: &PeerId, address: Multiaddr) -> Result<(), ()> {
self.add_discovered_addresses(peer_id, vec![address])
.map(|_| ())
}

fn add_discovered_addresses(
&mut self,
peer_id: &PeerId,
Expand All @@ -78,10 +81,7 @@ impl PeerStore for MemoryPeerStore {
}
// TODO
fn report(&mut self, _peer_id: &PeerId, _behaviour: Behaviour) {}
// TODO
fn report_address(&mut self, _address: &Multiaddr, _behaviour: Behaviour) {}
// TODO
fn report_status(&mut self, peer_id: &PeerId, status: Status) {
fn update_status(&mut self, peer_id: &PeerId, status: Status) {
if let Some(peer) = self.peers.get_mut(&peer_id) {
let now = Instant::now();
peer.last_updated_at = now;
Expand All @@ -95,6 +95,10 @@ impl PeerStore for MemoryPeerStore {
None => Status::Unknown,
}
}
//TODO
fn peer_score(&self, _peer_id: &PeerId) -> Score {
0
}

fn bootnodes<'a>(&'a self) -> Box<Iterator<Item = (&'a PeerId, &'a Multiaddr)> + 'a> {
let mut bootnodes = self
Expand Down Expand Up @@ -130,28 +134,4 @@ impl PeerStore for MemoryPeerStore {
});
Box::new(peers) as Box<_>
}

fn reserved_nodes<'a>(&'a self) -> Box<Iterator<Item = (&'a PeerId, &'a Multiaddr)> + 'a> {
let iter =
self.reserved_nodes
.iter()
.filter_map(move |(peer_id, addresses)| match addresses.get(0) {
Some(address) => Some((peer_id, address)),
None => None,
});
Box::new(iter) as Box<_>
}
fn is_reserved(&self, peer_id: &PeerId) -> bool {
self.reserved_nodes.contains_key(peer_id)
}
fn add_reserved_node(
&mut self,
peer_id: PeerId,
addresses: Vec<Multiaddr>,
) -> Option<Vec<Multiaddr>> {
self.reserved_nodes.insert(peer_id, addresses)
}
fn remove_reserved_node(&mut self, peer_id: &PeerId) -> Option<Vec<Multiaddr>> {
self.reserved_nodes.remove(peer_id)
}
}
Loading

0 comments on commit d0db77e

Please sign in to comment.