Skip to content

Commit

Permalink
feat(relay): propagate errors to Transport::{listen_on,dial}
Browse files Browse the repository at this point in the history
To make a reservation with a relay, a user calls `Swarm::listen_on` with an address of the relay, suffixed with a `/p2pcircuit` protocol. Similarly, to establish a circuit to another peer, a user needs to call `Swarm::dial` with such an address. Upon success, the `Swarm` then issues a `SwarmEvent::NewListenAddr` event in case of a successful reservation or a `SwarmEvent::ConnectionEstablished` in case of a successful connect.

The story is different for errors. Somewhat counterintuitively, the actual reason of an error during these operations are only reported as `relay::Event`s without a direct correlation to the user's `Swarm::listen_on` or `Swarm::dial` calls.

With this PR, we send these errors back "into" the `Transport` and report them as `SwarmEvent::ListenerClosed` or `SwarmEvent::OutgoingConnectionError`. This is conceptually more correct. Additionally, by sending these errors back to the transport, we no longer use `ConnectionHandlerEvent::Close` which entirely closes the underlying relay connection. In case the connection is not used for something else, it will be closed by the keep-alive algorithm.

Resolves: #4717.
Related: #3591.
Related: #4718.

Pull-Request: #4745.
  • Loading branch information
thomaseizinger committed Oct 31, 2023
1 parent d0f62e9 commit 6a8cef5
Show file tree
Hide file tree
Showing 19 changed files with 573 additions and 407 deletions.
4 changes: 3 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ resolver = "2"
rust-version = "1.73.0"

[workspace.dependencies]
futures-bounded = { version = "0.2.0", path = "misc/futures-bounded" }
futures-bounded = { version = "0.2.1", path = "misc/futures-bounded" }
libp2p = { version = "0.53.0", path = "libp2p" }
libp2p-allow-block-list = { version = "0.3.0", path = "misc/allow-block-list" }
libp2p-autonat = { version = "0.12.0", path = "protocols/autonat" }
Expand Down
1 change: 1 addition & 0 deletions hole-punching-tests/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,4 @@ redis = { version = "0.23.0", default-features = false, features = ["tokio-comp"
tokio = { version = "1.29.1", features = ["full"] }
serde = { version = "1.0.190", features = ["derive"] }
serde_json = "1.0.107"
either = "1.9.0"
58 changes: 48 additions & 10 deletions hole-punching-tests/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,11 @@
// DEALINGS IN THE SOFTWARE.

use anyhow::{Context, Result};
use either::Either;
use futures::stream::StreamExt;
use libp2p::core::transport::ListenerId;
use libp2p::swarm::dial_opts::DialOpts;
use libp2p::swarm::ConnectionId;
use libp2p::{
core::multiaddr::{Multiaddr, Protocol},
dcutr, identify, noise, ping, relay,
Expand Down Expand Up @@ -83,17 +87,22 @@ async fn main() -> Result<()> {
.build();

client_listen_on_transport(&mut swarm, transport).await?;
client_setup(&mut swarm, &mut redis, relay_addr.clone(), mode).await?;
let id = client_setup(&mut swarm, &mut redis, relay_addr.clone(), mode).await?;

let mut hole_punched_peer_connection = None;

loop {
match (swarm.next().await.unwrap(), hole_punched_peer_connection) {
match (
swarm.next().await.unwrap(),
hole_punched_peer_connection,
id,
) {
(
SwarmEvent::Behaviour(BehaviourEvent::RelayClient(
relay::client::Event::ReservationReqAccepted { .. },
)),
_,
_,
) => {
log::info!("Relay accepted our reservation request.");

Expand All @@ -109,6 +118,7 @@ async fn main() -> Result<()> {
},
)),
_,
_,
) => {
log::info!("Successfully hole-punched to {remote_peer_id}");

Expand All @@ -121,6 +131,7 @@ async fn main() -> Result<()> {
..
})),
Some(hole_punched_connection),
_,
) if mode == Mode::Dial && connection == hole_punched_connection => {
println!("{}", serde_json::to_string(&Report::new(rtt))?);

Expand All @@ -135,12 +146,32 @@ async fn main() -> Result<()> {
},
)),
_,
_,
) => {
log::info!("Failed to hole-punched to {remote_peer_id}");
return Err(anyhow::Error::new(error));
}
(SwarmEvent::OutgoingConnectionError { error, .. }, _) => {
anyhow::bail!(error)
(
SwarmEvent::ListenerClosed {
listener_id,
reason: Err(e),
..
},
_,
Either::Left(reservation),
) if listener_id == reservation => {
anyhow::bail!("Reservation on relay failed: {e}");
}
(
SwarmEvent::OutgoingConnectionError {
connection_id,
error,
..
},
_,
Either::Right(circuit),
) if connection_id == circuit => {
anyhow::bail!("Circuit request relay failed: {error}");
}
_ => {}
}
Expand Down Expand Up @@ -209,23 +240,30 @@ async fn client_setup(
redis: &mut RedisClient,
relay_addr: Multiaddr,
mode: Mode,
) -> Result<()> {
match mode {
) -> Result<Either<ListenerId, ConnectionId>> {
let either = match mode {
Mode::Listen => {
swarm.listen_on(relay_addr.with(Protocol::P2pCircuit))?;
let id = swarm.listen_on(relay_addr.with(Protocol::P2pCircuit))?;

Either::Left(id)
}
Mode::Dial => {
let remote_peer_id = redis.pop(LISTEN_CLIENT_PEER_ID).await?;

swarm.dial(
let opts = DialOpts::from(
relay_addr
.with(Protocol::P2pCircuit)
.with(Protocol::P2p(remote_peer_id)),
)?;
);
let id = opts.connection_id();

swarm.dial(opts)?;

Either::Right(id)
}
};

Ok(())
Ok(either)
}

fn tcp_addr(addr: IpAddr) -> Multiaddr {
Expand Down
5 changes: 5 additions & 0 deletions misc/futures-bounded/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
## 0.2.1 - unreleased

- Add `.len()` getter to `FuturesMap`, `FuturesSet`, `StreamMap` and `StreamSet`.
See [PR 4745](https://github.com/libp2p/rust-lib2pp/pulls/4745).

## 0.2.0

- Add `StreamMap` type and remove `Future`-suffix from `PushError::ReplacedFuture` to reuse it for `StreamMap`.
Expand Down
2 changes: 1 addition & 1 deletion misc/futures-bounded/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "futures-bounded"
version = "0.2.0"
version = "0.2.1"
edition = "2021"
rust-version.workspace = true
license = "MIT"
Expand Down
4 changes: 4 additions & 0 deletions misc/futures-bounded/src/futures_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,10 @@ where
}
}

pub fn len(&self) -> usize {
self.inner.len()
}

pub fn is_empty(&self) -> bool {
self.inner.is_empty()
}
Expand Down
4 changes: 4 additions & 0 deletions misc/futures-bounded/src/futures_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ impl<O> FuturesSet<O> {
}
}

pub fn len(&self) -> usize {
self.inner.len()
}

pub fn is_empty(&self) -> bool {
self.inner.is_empty()
}
Expand Down
6 changes: 5 additions & 1 deletion misc/futures-bounded/src/stream_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,10 @@ where
Some(inner)
}

pub fn len(&self) -> usize {
self.inner.len()
}

pub fn is_empty(&self) -> bool {
self.inner.is_empty()
}
Expand Down Expand Up @@ -256,7 +260,7 @@ mod tests {

assert!(poll.is_pending());
assert_eq!(
streams.inner.len(),
streams.len(),
0,
"resources of cancelled streams are cleaned up properly"
);
Expand Down
4 changes: 4 additions & 0 deletions misc/futures-bounded/src/stream_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ where
}
}

pub fn len(&self) -> usize {
self.inner.len()
}

pub fn is_empty(&self) -> bool {
self.inner.is_empty()
}
Expand Down
11 changes: 11 additions & 0 deletions protocols/relay/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,17 @@

- Fix a rare race condition when making a reservation on a relay that could lead to a failed reservation.
See [PR 4747](https://github.com/libp2p/rust-lib2pp/pulls/4747).
- Propagate errors of relay client to the listener / dialer.
A failed reservation will now appear as `SwarmEvent::ListenerClosed` with the `ListenerId` of the corresponding `Swarm::listen_on` call.
A failed circuit request will now appear as `SwarmEvent::OutgoingConnectionError` with the `ConnectionId` of the corresponding `Swarm::dial` call.
Lastly, a failed reservation or circuit request will **no longer** close the underlying relay connection.
As a result, we remove the following enum variants:
- `relay::client::Event::ReservationReqFailed`
- `relay::client::Event::OutboundCircuitReqFailed`
- `relay::client::Event::InboundCircuitReqDenied`
- `relay::client::Event::InboundCircuitReqDenyFailed`

See [PR 4745](https://github.com/libp2p/rust-lib2pp/pulls/4745).

## 0.16.2

Expand Down
1 change: 1 addition & 0 deletions protocols/relay/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ libp2p-plaintext = { workspace = true }
libp2p-swarm = { workspace = true, features = ["macros", "async-std"] }
libp2p-yamux = { workspace = true }
quickcheck = { workspace = true }
libp2p-swarm-test = { workspace = true }

# Passing arguments to the docsrs builder in order to properly document cfg's.
# More information: https://docs.rs/about/builds#cross-compiling
Expand Down
5 changes: 1 addition & 4 deletions protocols/relay/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,12 @@ pub mod inbound {
pub mod hop {
pub use crate::protocol::inbound_hop::FatalUpgradeError;
}
pub mod stop {
pub use crate::protocol::inbound_stop::FatalUpgradeError;
}
}

/// Types related to the relay protocol outbound.
pub mod outbound {
pub mod hop {
pub use crate::protocol::outbound_hop::FatalUpgradeError;
pub use crate::protocol::outbound_hop::{ConnectError, ProtocolViolation, ReserveError};
}
pub mod stop {
pub use crate::protocol::outbound_stop::FatalUpgradeError;
Expand Down
43 changes: 4 additions & 39 deletions protocols/relay/src/priv_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ pub(crate) mod transport;

use crate::multiaddr_ext::MultiaddrExt;
use crate::priv_client::handler::Handler;
use crate::protocol::{self, inbound_stop, outbound_hop};
use crate::protocol::{self, inbound_stop};
use bytes::Bytes;
use either::Either;
use futures::channel::mpsc::Receiver;
Expand All @@ -39,8 +39,7 @@ use libp2p_swarm::behaviour::{ConnectionClosed, ConnectionEstablished, FromSwarm
use libp2p_swarm::dial_opts::DialOpts;
use libp2p_swarm::{
dummy, ConnectionDenied, ConnectionHandler, ConnectionId, DialFailure, NetworkBehaviour,
NotifyHandler, Stream, StreamUpgradeError, THandler, THandlerInEvent, THandlerOutEvent,
ToSwarm,
NotifyHandler, Stream, THandler, THandlerInEvent, THandlerOutEvent, ToSwarm,
};
use std::collections::{hash_map, HashMap, VecDeque};
use std::io::{Error, ErrorKind, IoSlice};
Expand All @@ -59,32 +58,15 @@ pub enum Event {
renewal: bool,
limit: Option<protocol::Limit>,
},
ReservationReqFailed {
relay_peer_id: PeerId,
/// Indicates whether the request replaces an existing reservation.
renewal: bool,
error: StreamUpgradeError<outbound_hop::ReservationFailedReason>,
},
OutboundCircuitEstablished {
relay_peer_id: PeerId,
limit: Option<protocol::Limit>,
},
OutboundCircuitReqFailed {
relay_peer_id: PeerId,
error: StreamUpgradeError<outbound_hop::CircuitFailedReason>,
},
/// An inbound circuit has been established.
InboundCircuitEstablished {
src_peer_id: PeerId,
limit: Option<protocol::Limit>,
},
/// An inbound circuit request has been denied.
InboundCircuitReqDenied { src_peer_id: PeerId },
/// Denying an inbound circuit request failed.
InboundCircuitReqDenyFailed {
src_peer_id: PeerId,
error: inbound_stop::UpgradeError,
},
}

/// [`NetworkBehaviour`] implementation of the relay client
Expand Down Expand Up @@ -252,32 +234,15 @@ impl NetworkBehaviour for Behaviour {
limit,
}
}
handler::Event::ReservationReqFailed { renewal, error } => {
Event::ReservationReqFailed {
relay_peer_id: event_source,
renewal,
error,
}
}
handler::Event::OutboundCircuitEstablished { limit } => {
Event::OutboundCircuitEstablished {
relay_peer_id: event_source,
limit,
}
}
handler::Event::OutboundCircuitReqFailed { error } => Event::OutboundCircuitReqFailed {
relay_peer_id: event_source,
error,
},
handler::Event::InboundCircuitEstablished { src_peer_id, limit } => {
Event::InboundCircuitEstablished { src_peer_id, limit }
}
handler::Event::InboundCircuitReqDenied { src_peer_id } => {
Event::InboundCircuitReqDenied { src_peer_id }
}
handler::Event::InboundCircuitReqDenyFailed { src_peer_id, error } => {
Event::InboundCircuitReqDenyFailed { src_peer_id, error }
}
};

self.queued_actions.push_back(ToSwarm::GenerateEvent(event))
Expand Down Expand Up @@ -336,7 +301,7 @@ impl NetworkBehaviour for Behaviour {
peer_id: relay_peer_id,
handler: NotifyHandler::One(*connection_id),
event: Either::Left(handler::In::EstablishCircuit {
send_back,
to_dial: send_back,
dst_peer_id,
}),
},
Expand All @@ -350,7 +315,7 @@ impl NetworkBehaviour for Behaviour {
self.pending_handler_commands.insert(
connection_id,
handler::In::EstablishCircuit {
send_back,
to_dial: send_back,
dst_peer_id,
},
);
Expand Down
Loading

0 comments on commit 6a8cef5

Please sign in to comment.