Skip to content

Commit

Permalink
fix(quic): add support for reusing an existing socket for local dialing
Browse files Browse the repository at this point in the history
Tracked in #4259. Now if a listener supports loopback interfaces, when a remote address is also a loopback address, we reuse an existing listener.

Pull-Request: #4304.
  • Loading branch information
arsenron authored and thomaseizinger committed Aug 20, 2023
1 parent e468e7a commit d83b33f
Show file tree
Hide file tree
Showing 6 changed files with 74 additions and 10 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ libp2p-perf = { version = "0.2.0", path = "protocols/perf" }
libp2p-ping = { version = "0.43.0", path = "protocols/ping" }
libp2p-plaintext = { version = "0.40.0", path = "transports/plaintext" }
libp2p-pnet = { version = "0.23.0", path = "transports/pnet" }
libp2p-quic = { version = "0.9.1-alpha", path = "transports/quic" }
libp2p-quic = { version = "0.9.2-alpha", path = "transports/quic" }
libp2p-relay = { version = "0.16.1", path = "protocols/relay" }
libp2p-rendezvous = { version = "0.13.0", path = "protocols/rendezvous" }
libp2p-request-response = { version = "0.25.1", path = "protocols/request-response" }
Expand Down
7 changes: 7 additions & 0 deletions transports/quic/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
## 0.9.2-alpha

- Add support for reusing an existing socket when dialing localhost address.
See [PR 4304].

[PR 4304]: https://github.com/libp2p/rust-libp2p/pull/4304

## 0.9.1-alpha

- Allow listening on ipv4 and ipv6 separately.
Expand Down
2 changes: 1 addition & 1 deletion transports/quic/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "libp2p-quic"
version = "0.9.1-alpha"
version = "0.9.2-alpha"
authors = ["Parity Technologies <[email protected]>"]
edition = "2021"
rust-version = { workspace = true }
Expand Down
28 changes: 21 additions & 7 deletions transports/quic/src/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use libp2p_core::{
use libp2p_identity::PeerId;
use socket2::{Domain, Socket, Type};
use std::collections::hash_map::{DefaultHasher, Entry};
use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use std::hash::{Hash, Hasher};
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, UdpSocket};
use std::time::Duration;
Expand Down Expand Up @@ -155,9 +155,16 @@ impl<P: Provider> GenTransport<P> {
if l.is_closed {
return false;
}
let listen_addr = l.socket_addr();
SocketFamily::is_same(&listen_addr.ip(), &socket_addr.ip())
&& listen_addr.ip().is_loopback() == socket_addr.ip().is_loopback()
SocketFamily::is_same(&l.socket_addr().ip(), &socket_addr.ip())
})
.filter(|l| {
if socket_addr.ip().is_loopback() {
l.listening_addresses
.iter()
.any(|ip_addr| ip_addr.is_loopback())
} else {
true
}
})
.collect();
match listeners.len() {
Expand Down Expand Up @@ -428,6 +435,8 @@ struct Listener<P: Provider> {

/// The stream must be awaken after it has been closed to deliver the last event.
close_listener_waker: Option<Waker>,

listening_addresses: HashSet<IpAddr>,
}

impl<P: Provider> Listener<P> {
Expand All @@ -440,12 +449,14 @@ impl<P: Provider> Listener<P> {
) -> Result<Self, Error> {
let if_watcher;
let pending_event;
let mut listening_addresses = HashSet::new();
let local_addr = socket.local_addr()?;
if local_addr.ip().is_unspecified() {
if_watcher = Some(P::new_if_watcher()?);
pending_event = None;
} else {
if_watcher = None;
listening_addresses.insert(local_addr.ip());
let ma = socketaddr_to_multiaddr(&local_addr, version);
pending_event = Some(TransportEvent::NewAddress {
listener_id,
Expand All @@ -467,6 +478,7 @@ impl<P: Provider> Listener<P> {
is_closed: false,
pending_event,
close_listener_waker: None,
listening_addresses,
})
}

Expand Down Expand Up @@ -513,7 +525,8 @@ impl<P: Provider> Listener<P> {
if let Some(listen_addr) =
ip_to_listenaddr(&endpoint_addr, inet.addr(), self.version)
{
log::debug!("New listen address: {}", listen_addr);
log::debug!("New listen address: {listen_addr}");
self.listening_addresses.insert(inet.addr());
return Poll::Ready(TransportEvent::NewAddress {
listener_id: self.listener_id,
listen_addr,
Expand All @@ -524,7 +537,8 @@ impl<P: Provider> Listener<P> {
if let Some(listen_addr) =
ip_to_listenaddr(&endpoint_addr, inet.addr(), self.version)
{
log::debug!("Expired listen address: {}", listen_addr);
log::debug!("Expired listen address: {listen_addr}");
self.listening_addresses.remove(&inet.addr());
return Poll::Ready(TransportEvent::AddressExpired {
listener_id: self.listener_id,
listen_addr,
Expand Down Expand Up @@ -730,7 +744,7 @@ fn socketaddr_to_multiaddr(socket_addr: &SocketAddr, version: ProtocolVersion) -

#[cfg(test)]
#[cfg(any(feature = "async-std", feature = "tokio"))]
mod test {
mod tests {
use futures::future::poll_fn;
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};

Expand Down
43 changes: 43 additions & 0 deletions transports/quic/tests/smoke.rs
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,49 @@ async fn write_after_peer_dropped_stream() {
stream_b.close().await.expect("Close failed.");
}

/// - A listens on 0.0.0.0:0
/// - B listens on 127.0.0.1:0
/// - A dials B
/// - Source port of A at B is the A's listen port
#[cfg(feature = "tokio")]
#[tokio::test]
async fn test_local_listener_reuse() {
let (_, mut a_transport) = create_default_transport::<quic::tokio::Provider>();
let (_, mut b_transport) = create_default_transport::<quic::tokio::Provider>();

a_transport
.listen_on(
ListenerId::next(),
"/ip4/0.0.0.0/udp/0/quic-v1".parse().unwrap(),
)
.unwrap();

// wait until a listener reports a loopback address
let a_listen_addr = 'outer: loop {
let ev = a_transport.next().await.unwrap();
let listen_addr = ev.into_new_address().unwrap();
for proto in listen_addr.iter() {
if let Protocol::Ip4(ip4) = proto {
if ip4.is_loopback() {
break 'outer listen_addr;
}
}
}
};
// If we do not poll until the end, `NewAddress` events may be `Ready` and `connect` function
// below will panic due to an unexpected event.
poll_fn(|cx| {
let mut pinned = Pin::new(&mut a_transport);
while pinned.as_mut().poll(cx).is_ready() {}
Poll::Ready(())
})
.await;

let b_addr = start_listening(&mut b_transport, "/ip4/127.0.0.1/udp/0/quic-v1").await;
let (_, send_back_addr, _) = connect(&mut b_transport, &mut a_transport, b_addr).await.0;
assert_eq!(send_back_addr, a_listen_addr);
}

async fn smoke<P: Provider>() {
let _ = env_logger::try_init();

Expand Down

0 comments on commit d83b33f

Please sign in to comment.