Skip to content
This repository has been archived by the owner on Oct 23, 2022. It is now read-only.

Upgrade libp2p 0.19.0 #169

Merged
merged 32 commits into from
May 19, 2020
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
cd46721
chore: upgrade the libp2p to 0.18.(0|1)
May 5, 2020
02fb206
chore: fix bitswap
May 5, 2020
6eecf7c
chore: fix pubsub
May 5, 2020
b3788e7
refactoring: remove the empty match on SwarmEvent
May 8, 2020
8acdd9e
refactor: fix most of the swarm with todos
May 8, 2020
046c104
add missing Clone bound on the Network to compile
May 8, 2020
7216bb9
refactor: remove warnings
May 8, 2020
2624941
chore cargo fmt
May 8, 2020
be6686d
feat: add ability to configure listening addr
May 8, 2020
e9989a3
add test case for the panic with two connections
May 8, 2020
f6cdd0f
fix: allow multiple connections in p2p/swarm
May 8, 2020
8563d8c
refactor: formatting and simplification
May 8, 2020
d443290
doc: Connection and the any connecting address
May 8, 2020
b0b9840
chore: delegate all networkbeh methods to floodsub
May 9, 2020
44fb8d8
chore: cargo fmt
May 11, 2020
3373191
refactor: add better assertion messages at connect_two
May 15, 2020
4d8ecbd
chore: cleanup libp2p patching examples
May 18, 2020
50d92f6
refactor: restore iterator returning SwarmApi::connections
May 18, 2020
6dc023e
refactor: rename SwarmApi::stats
May 18, 2020
b72a19e
fix: remove too eager peer removal
May 18, 2020
9afe9b1
refactor: use suggested std methods
May 18, 2020
752b8ac
chore cargo fmt
May 18, 2020
006de71
refactor: remove if with unwrap_or_default
May 18, 2020
afa58d3
chore: upgrade to libp2p 0.19
May 19, 2020
bbb952a
fix: need do accounting when banning peer
May 19, 2020
c72eb47
chore: upgrade to 0.19
May 19, 2020
ea080ff
refactor: cargo fmt and fix log messages
May 19, 2020
2dac944
fix: awaiting for binding new listening addresses
May 19, 2020
62f12c9
chore: cargo fmt
May 19, 2020
fb60e97
refactor: use hashmap.entry per clippy suggestion
May 19, 2020
dfa6a9b
refactor: clippy and comment adjustment
May 19, 2020
0d80f21
fix test on macos, fix unspecified addr handling
May 19, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
189 changes: 90 additions & 99 deletions Cargo.lock

Large diffs are not rendered by default.

10 changes: 5 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ dirs = "2.0.2"
domain = { git = "https://github.com/nlnetlabs/domain", rev="084964", features = ["resolv"] }
futures = { version = "0.3.4", features = ["compat", "io-compat"] }
libipld = { git = "https://github.com/ipfs-rust/rust-ipld", rev = "b2286c53c13f3eeec2a3766387f2926838e8e4c9", features = ["dag-pb", "dag-json"] }
libp2p = "0.16.2"
libp2p = "0.18.1"
log = "0.4.8"
multibase = "0.8.0"
multihash = "0.10.1"
Expand All @@ -45,7 +45,7 @@ members = [ "bitswap", "http", "examples" ]
ctr = { git = "https://github.com/koivunej/stream-ciphers.git", branch = "ctr128-64to128" }

# these are needed for the floodsub local originated messages to be seen by subscribers
libp2p = { git = "https://github.com/koivunej/rust-libp2p.git", branch = "v0.16.2-with-floodsub-opts" }
libp2p-core = { git = "https://github.com/koivunej/rust-libp2p.git", branch = "v0.16.2-with-floodsub-opts" }
libp2p-swarm = { git = "https://github.com/koivunej/rust-libp2p.git", branch = "v0.16.2-with-floodsub-opts" }
libp2p-floodsub = { git = "https://github.com/koivunej/rust-libp2p.git", branch = "v0.16.2-with-floodsub-opts" }
# libp2p = { git = "https://github.com/koivunej/rust-libp2p.git", branch = "v0.16.2-with-floodsub-opts" }
# libp2p-core = { git = "https://github.com/koivunej/rust-libp2p.git", branch = "v0.16.2-with-floodsub-opts" }
# libp2p-swarm = { git = "https://github.com/koivunej/rust-libp2p.git", branch = "v0.16.2-with-floodsub-opts" }
# libp2p-floodsub = { git = "https://github.com/koivunej/rust-libp2p.git", branch = "v0.16.2-with-floodsub-opts" }
koivunej marked this conversation as resolved.
Show resolved Hide resolved
4 changes: 2 additions & 2 deletions bitswap/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ async-trait = "0.1.29"
fnv = "1.0.6"
futures = "0.3.4"
libipld = "0.1.0"
libp2p-core = "0.16.0"
libp2p-swarm = "0.16.1"
libp2p-core = "0.18.0"
libp2p-swarm = "0.18.1"
log = "0.4.8"
multihash = "0.10.1"
prost = "0.6.1"
Expand Down
93 changes: 57 additions & 36 deletions bitswap/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@ use fnv::FnvHashSet;
use futures::task::Context;
use futures::task::Poll;
use libipld::cid::Cid;
use libp2p_core::{ConnectedPoint, Multiaddr, PeerId};
use libp2p_core::{connection::ConnectionId, Multiaddr, PeerId};
use libp2p_swarm::protocols_handler::{IntoProtocolsHandler, OneShotHandler, ProtocolsHandler};
use libp2p_swarm::{NetworkBehaviour, NetworkBehaviourAction, PollParameters};
use libp2p_swarm::{
DialPeerCondition, NetworkBehaviour, NetworkBehaviourAction, NotifyHandler, PollParameters,
};
use std::collections::{HashMap, VecDeque};

/// Network behaviour that handles sending and receiving IPFS blocks.
Expand Down Expand Up @@ -94,10 +96,11 @@ impl<TStrategy> Bitswap<TStrategy> {
debug!("bitswap: connect");
if self.target_peers.insert(peer_id.clone()) {
debug!(" queuing dial_peer to {}", peer_id.to_base58());
self.events
.push_back(NetworkBehaviourAction::DialPeer { peer_id });
self.events.push_back(NetworkBehaviourAction::DialPeer {
peer_id,
condition: DialPeerCondition::Disconnected,
});
}
debug!("");
}

/// Sends a block to the peer.
Expand All @@ -111,10 +114,12 @@ impl<TStrategy> Bitswap<TStrategy> {
.expect("Peer not in ledger?!");
let message = ledger.send_block(block);
debug!(" queuing block for {}", peer_id.to_base58());
self.events.push_back(NetworkBehaviourAction::SendEvent {
peer_id,
event: message,
});
self.events
.push_back(NetworkBehaviourAction::NotifyHandler {
peer_id,
event: message,
handler: NotifyHandler::Any,
});
debug!("");
}

Expand All @@ -127,10 +132,12 @@ impl<TStrategy> Bitswap<TStrategy> {
message.want_block(cid, *priority);
}
debug!(" queuing wanted blocks");
self.events.push_back(NetworkBehaviourAction::SendEvent {
peer_id,
event: message,
});
self.events
.push_back(NetworkBehaviourAction::NotifyHandler {
peer_id,
event: message,
handler: NotifyHandler::Any,
});
}
}

Expand All @@ -142,10 +149,12 @@ impl<TStrategy> Bitswap<TStrategy> {
for (peer_id, ledger) in self.connected_peers.iter_mut() {
let message = ledger.want_block(&cid, priority);
debug!(" queuing want for {}", peer_id.to_base58());
self.events.push_back(NetworkBehaviourAction::SendEvent {
peer_id: peer_id.to_owned(),
event: message,
});
self.events
.push_back(NetworkBehaviourAction::NotifyHandler {
peer_id: peer_id.to_owned(),
event: message,
handler: NotifyHandler::Any,
});
}
self.wanted_blocks.insert(cid, priority);
debug!("");
Expand All @@ -163,7 +172,11 @@ impl<TStrategy> Bitswap<TStrategy> {
let peer_id = peer_id.to_owned();
debug!(" queuing cancel for {}", peer_id.to_base58());
self.events
.push_back(NetworkBehaviourAction::SendEvent { peer_id, event });
.push_back(NetworkBehaviourAction::NotifyHandler {
peer_id,
event,
handler: NotifyHandler::Any,
});
}
}
self.wanted_blocks.remove(cid);
Expand All @@ -185,25 +198,19 @@ impl<TStrategy: Strategy> NetworkBehaviour for Bitswap<TStrategy> {
Vec::new()
}

fn inject_connected(&mut self, peer_id: PeerId, cp: ConnectedPoint) {
debug!("bitswap: inject_connected");
debug!(" peer_id: {}", peer_id.to_base58());
debug!(" connected_point: {:?}", cp);
fn inject_connected(&mut self, peer_id: &PeerId) {
debug!("bitswap: inject_connected {}", peer_id);
let ledger = Ledger::new();
self.connected_peers.insert(peer_id.clone(), ledger);
self.send_want_list(peer_id);
debug!("");
self.send_want_list(peer_id.clone());
}

fn inject_disconnected(&mut self, peer_id: &PeerId, cp: ConnectedPoint) {
debug!("bitswap: inject_disconnected {:?}", cp);
debug!(" peer_id: {}", peer_id.to_base58());
debug!(" connected_point: {:?}", cp);
debug!("");
fn inject_disconnected(&mut self, peer_id: &PeerId) {
debug!("bitswap: inject_disconnected {:?}", peer_id);
//self.connected_peers.remove(peer_id);
}

fn inject_node_event(&mut self, source: PeerId, event: InnerMessage) {
fn inject_event(&mut self, source: PeerId, _connection: ConnectionId, event: InnerMessage) {
debug!("bitswap: inject_node_event");
debug!("{:?}", event);
let message = match event {
Expand Down Expand Up @@ -232,7 +239,7 @@ impl<TStrategy: Strategy> NetworkBehaviour for Bitswap<TStrategy> {
.process_want(source.clone(), cid.to_owned(), *priority);
}
// TODO: Remove cancelled `Want` events from the queue.
// TODO: Remove cancelled blocks from `SendEvent`.
// TODO: Remove cancelled blocks from `NotifyHandler`.
debug!("");
}

Expand All @@ -242,19 +249,33 @@ impl<TStrategy: Strategy> NetworkBehaviour for Bitswap<TStrategy> {
{
// TODO concat messages to same destination to reduce traffic.
if let Some(event) = self.events.pop_front() {
if let NetworkBehaviourAction::SendEvent { peer_id, event } = event {
if let NetworkBehaviourAction::NotifyHandler {
peer_id,
event,
handler,
} = event
{
match self.connected_peers.get_mut(&peer_id) {
None => {
debug!(" requeueing send event to {}", peer_id.to_base58());
debug!(" requeueing send event to {}", peer_id);
// FIXME: I wonder if this should be
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd make this FIXME a bit more descriptive

self.events
.push_back(NetworkBehaviourAction::SendEvent { peer_id, event })
.push_back(NetworkBehaviourAction::NotifyHandler {
peer_id,
event,
handler,
})
}
Some(ref mut ledger) => {
// FIXME: this is a bit early to update stats as the block hasn't been sent
// to anywhere at this point.
ledger.update_outgoing_stats(&event);
debug!(" send_message to {}", peer_id.to_base58());
return Poll::Ready(NetworkBehaviourAction::SendEvent { peer_id, event });
debug!(" send_message to {}", peer_id);
return Poll::Ready(NetworkBehaviourAction::NotifyHandler {
peer_id,
event,
handler,
});
}
}
} else {
Expand Down
77 changes: 61 additions & 16 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@ use futures::stream::Fuse;
pub use libipld::cid::Cid;
use libipld::cid::Codec;
pub use libipld::ipld::Ipld;
pub use libp2p::core::{ConnectedPoint, Multiaddr, PeerId, PublicKey};
pub use libp2p::core::{connection::ListenerId, ConnectedPoint, Multiaddr, PeerId, PublicKey};
pub use libp2p::identity::Keypair;

use std::borrow::Borrow;
use std::collections::HashMap;
use std::fmt;
use std::future::Future;
use std::marker::PhantomData;
Expand Down Expand Up @@ -245,6 +246,8 @@ enum IpfsEvent {
PubsubSubscribed(OneshotSender<Vec<String>>),
WantList(Option<PeerId>, OneshotSender<Vec<(Cid, bitswap::Priority)>>),
BitswapStats(OneshotSender<BitswapStats>),
AddListeningAddress(Multiaddr, Channel<()>),
RemoveListeningAddress(Multiaddr, Channel<()>),
Exit,
}

Expand Down Expand Up @@ -298,6 +301,7 @@ impl<Types: IpfsTypes> UninitializedIpfs<Types> {
repo_events: repo_events.fuse(),
from_facade: receiver.fuse(),
swarm,
listening_addresses: HashMap::new(),
};

let UninitializedIpfs {
Expand Down Expand Up @@ -538,6 +542,33 @@ impl<Types: IpfsTypes> Ipfs<Types> {
Ok(rx.await?)
}

/// Add a given multiaddr as a listening address. Will fail if the address is unsupported, or
/// if it is already being listened on. Currently will invoke `Swarm::listen_on` internally,
/// keep the ListenerId for later `remove_listening_address` use in a HashMap.
pub async fn add_listening_address(&self, addr: Multiaddr) -> Result<(), Error> {
let (tx, rx) = oneshot_channel();

self.to_task
.clone()
.send(IpfsEvent::AddListeningAddress(addr, tx))
.await?;

rx.await?
}

/// Stop listening on a previously added listening address. Fails if the address is not being
/// listened to.
pub async fn remove_listening_address(&self, addr: Multiaddr) -> Result<(), Error> {
let (tx, rx) = oneshot_channel();

self.to_task
.clone()
.send(IpfsEvent::RemoveListeningAddress(addr, tx))
.await?;

rx.await?
}

/// Exit daemon.
pub async fn exit_daemon(mut self) {
// FIXME: this is a stopgap measure needed while repo is part of the struct Ipfs instead of
Expand All @@ -556,14 +587,15 @@ struct IpfsFuture<Types: SwarmTypes> {
swarm: TSwarm<Types>,
repo_events: Fuse<Receiver<RepoEvent>>,
from_facade: Fuse<Receiver<IpfsEvent>>,
listening_addresses: HashMap<Multiaddr, ListenerId>,
}

impl<Types: SwarmTypes> Future for IpfsFuture<Types> {
type Output = ();

fn poll(mut self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Self::Output> {
use futures::Stream;
use libp2p::{swarm::SwarmEvent, Swarm};
use libp2p::Swarm;

// begin by polling the swarm so that initially it'll first have chance to bind listeners
// and such. TODO: this no longer needs to be a swarm event but perhaps we should
Expand All @@ -573,7 +605,7 @@ impl<Types: SwarmTypes> Future for IpfsFuture<Types> {

loop {
loop {
let inner = {
let _inner = {
let next = self.swarm.next_event();
futures::pin_mut!(next);
match next.poll(ctx) {
Expand All @@ -583,19 +615,7 @@ impl<Types: SwarmTypes> Future for IpfsFuture<Types> {
}
};
done = false;
match inner {
SwarmEvent::Behaviour(()) => {}
SwarmEvent::Connected(_peer_id) => {}
SwarmEvent::Disconnected(_peer_id) => {}
SwarmEvent::NewListenAddr(_addr) => {}
SwarmEvent::ExpiredListenAddr(_addr) => {}
SwarmEvent::UnreachableAddr {
peer_id: _peer_id,
address: _address,
error: _error,
} => {}
SwarmEvent::StartConnect(_peer_id) => {}
}
// the _inner can be useful for debugging
}

// temporary pinning of the receivers should be safe as we are pinning through the
Expand Down Expand Up @@ -676,6 +696,31 @@ impl<Types: SwarmTypes> Future for IpfsFuture<Types> {
let wantlist = self.swarm.bitswap().local_wantlist();
let _ = ret.send((stats, peers, wantlist).into());
}
IpfsEvent::AddListeningAddress(addr, ret) => {
let added = match Swarm::listen_on(&mut self.swarm, addr.clone()) {
Ok(id) => {
self.listening_addresses.insert(addr, id);
Ok(())
}
Err(e) => Err(Error::from(e)),
};

let _ = ret.send(added);
}
IpfsEvent::RemoveListeningAddress(addr, ret) => {
let removed = if let Some(id) = self.listening_addresses.remove(&addr) {
Swarm::remove_listener(&mut self.swarm, id).map_err(|_: ()| {
format_err!(
"Failed to remove previously added listening address: {}",
addr
)
})
} else {
Err(format_err!("Address was not listened to before: {}", addr))
};

let _ = ret.send(removed);
}
IpfsEvent::Exit => {
// FIXME: we could do a proper teardown
return Poll::Ready(());
Expand Down
6 changes: 3 additions & 3 deletions src/p2p/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,13 +113,13 @@ impl<TSwarmTypes: SwarmTypes> NetworkBehaviourEventProcess<PingEvent> for Behavi
peer,
result: Result::Ok(PingSuccess::Pong),
} => {
log::trace!("ping: pong from {}", peer.to_base58());
log::trace!("ping: pong from {}", peer);
}
PingEvent {
peer,
result: Result::Err(PingFailure::Timeout),
} => {
log::trace!("ping: timeout to {}", peer.to_base58());
log::trace!("ping: timeout to {}", peer);
self.remove_peer(&peer);
}
PingEvent {
Expand Down Expand Up @@ -203,7 +203,7 @@ impl<TSwarmTypes: SwarmTypes> Behaviour<TSwarmTypes> {
}

pub fn connections(&self) -> Vec<Connection> {
self.swarm.connections().cloned().collect()
self.swarm.connections()
}

pub fn connect(&mut self, addr: Multiaddr) -> SubscriptionFuture<Result<(), String>> {
Expand Down
Loading