diff --git a/hole-punching-tests/Cargo.toml b/hole-punching-tests/Cargo.toml index 0f36dabfb6e..5d68b744c40 100644 --- a/hole-punching-tests/Cargo.toml +++ b/hole-punching-tests/Cargo.toml @@ -6,7 +6,7 @@ publish = false license = "MIT" [dependencies] -clap = { version = "4.3.8", features = ["derive"] } +clap = { version = "4.3.8", features = ["derive", "env"] } env_logger = "0.10.0" futures = "0.3.28" libp2p = { path = "../libp2p", features = ["tokio", "dcutr", "identify", "macros", "noise", "ping", "relay", "tcp", "yamux", "quic"] } diff --git a/hole-punching-tests/Dockerfile b/hole-punching-tests/Dockerfile index a66007036aa..10e17586583 100644 --- a/hole-punching-tests/Dockerfile +++ b/hole-punching-tests/Dockerfile @@ -12,9 +12,7 @@ RUN --mount=type=cache,target=./target \ cargo build --release --package hole-punching-tests --target x86_64-unknown-linux-musl RUN --mount=type=cache,target=./target \ - mv ./target/x86_64-unknown-linux-musl/release/hp_client /usr/local/bin/hp_client -RUN --mount=type=cache,target=./target \ - mv ./target/x86_64-unknown-linux-musl/release/relay /usr/local/bin/relay + mv ./target/x86_64-unknown-linux-musl/release/hole-punching-tests /usr/local/bin/hole-punching-tests FROM ubuntu:jammy @@ -23,7 +21,6 @@ RUN --mount=type=cache,target=/var/cache/apt apt-get update && apt-get install - # Debugging tools RUN --mount=type=cache,target=/var/cache/apt apt-get install -y tcpdump ncat iputils-ping -COPY --from=builder /usr/local/bin/hp_client /usr/bin/hp_client -COPY --from=builder /usr/local/bin/relay /usr/bin/relay +COPY --from=builder /usr/local/bin/hole-punching-tests /usr/bin/hole-punching-tests ENV RUST_BACKTRACE=1 diff --git a/hole-punching-tests/docker-compose.yml b/hole-punching-tests/docker-compose.yml index b00e8302a1d..25b35cf2d2c 100644 --- a/hole-punching-tests/docker-compose.yml +++ b/hole-punching-tests/docker-compose.yml @@ -9,7 +9,9 @@ services: context: .. container_name: relay init: true - command: /usr/bin/relay --listen-addr 17.0.0.10 + command: /usr/bin/hole-punching-tests relay + environment: + LISTEN_ADDR: 17.0.0.10 networks: internet: ipv4_address: 17.0.0.10 @@ -32,8 +34,9 @@ services: dockerfile: ./hole-punching-tests/Dockerfile context: .. container_name: alice - init: true - command: ["/bin/bash", "-c", "ip route add 17.0.0.0/16 via 192.168.0.10 dev eth0 && /usr/bin/hp_client --mode dial --transport tcp"] + command: ["/bin/bash", "-c", "ip route add 17.0.0.0/16 via 192.168.0.10 dev eth0 && /usr/bin/hole-punching-tests dial"] + environment: + TRANSPORT: tcp networks: alice_lan: ipv4_address: 192.168.0.11 @@ -57,8 +60,9 @@ services: dockerfile: ./hole-punching-tests/Dockerfile context: .. container_name: bob - init: true - command: ["/bin/bash", "-c", "ip route add 17.0.0.0/16 via 192.168.1.10 dev eth0 && /usr/bin/hp_client --mode listen --transport tcp"] + command: ["/bin/bash", "-c", "ip route add 17.0.0.0/16 via 192.168.1.10 dev eth0 && /usr/bin/hole-punching-tests listen"] + environment: + TRANSPORT: tcp networks: bob_lan: ipv4_address: 192.168.1.11 diff --git a/hole-punching-tests/src/bin/hp_client.rs b/hole-punching-tests/src/bin/hp_client.rs deleted file mode 100644 index 6fb9e21d2e2..00000000000 --- a/hole-punching-tests/src/bin/hp_client.rs +++ /dev/null @@ -1,259 +0,0 @@ -// Copyright 2021 Protocol Labs. -// -// Permission is hereby granted, free of charge, to any person obtaining a -// copy of this software and associated documentation files (the "Software"), -// to deal in the Software without restriction, including without limitation -// the rights to use, copy, modify, merge, publish, distribute, sublicense, -// and/or sell copies of the Software, and to permit persons to whom the -// Software is furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS -// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING -// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER -// DEALINGS IN THE SOFTWARE. - -use anyhow::{Context, Result}; -use clap::Parser; -use futures::{future::Either, stream::StreamExt}; -use libp2p::{ - core::{ - multiaddr::{Multiaddr, Protocol}, - muxing::StreamMuxerBox, - transport::Transport, - upgrade, - }, - dcutr, identify, identity, noise, ping, quic, relay, - swarm::{NetworkBehaviour, SwarmBuilder, SwarmEvent}, - tcp, yamux, PeerId, Swarm, -}; -use log::{info, LevelFilter}; -use redis::AsyncCommands; -use std::collections::HashMap; -use std::net::Ipv4Addr; -use std::str::FromStr; - -#[derive(Debug, Parser)] -#[clap(name = "libp2p DCUtR client")] -struct Opts { - /// The mode (listening or dialing). - #[clap(long)] - mode: Mode, - - /// The transport (tcp or quic). - #[clap(long)] - transport: TransportProtocol, -} - -#[tokio::main] -async fn main() -> Result<()> { - env_logger::builder() - .filter_level(LevelFilter::Info) - .parse_default_env() - .init(); - - let opts = Opts::parse(); - - let client = redis::Client::open("redis://redis:6379")?; - let mut connection = client.get_async_connection().await?; - - let redis_key = match opts.transport { - TransportProtocol::Tcp => "RELAY_TCP_ADDRESS", - TransportProtocol::Quic => "RELAY_QUIC_ADDRESS", - }; - - let relay_address = connection - .blpop::<_, HashMap>(redis_key, 10) - .await? - .remove(redis_key) - .expect("key that we asked for to be present") - .parse::()?; - - let mut swarm = make_swarm()?; - - // Both parties must have a listener for the hole-punch to work. - let listen_addr = match opts.transport { - TransportProtocol::Tcp => Multiaddr::empty() - .with(Protocol::Ip4(Ipv4Addr::UNSPECIFIED)) - .with(Protocol::Tcp(0)), - TransportProtocol::Quic => Multiaddr::empty() - .with(Protocol::Ip4(Ipv4Addr::UNSPECIFIED)) - .with(Protocol::Udp(0)) - .with(Protocol::QuicV1), - }; - let expected_listener_id = swarm - .listen_on(listen_addr) - .context("Failed to listen on address")?; - let mut listen_addresses = 0; - - // We should have at least two listen addresses, one for localhost and the actual interface. - while listen_addresses < 2 { - if let SwarmEvent::NewListenAddr { - listener_id, - address, - } = swarm.next().await.unwrap() - { - if listener_id == expected_listener_id { - listen_addresses += 1; - } - - info!("Listening on {address}"); - } - } - - swarm.dial(relay_address.clone())?; - - // Connect to the relay server. Not for the reservation or relayed connection, but to learn our local public address. - // FIXME: This should not be necessary. Perhaps dcutr should also already consider external address _candidates_? - loop { - if let SwarmEvent::Behaviour(BehaviourEvent::Identify(identify::Event::Received { - info: identify::Info { observed_addr, .. }, - .. - })) = swarm.next().await.unwrap() - { - info!("Relay told us our public address: {:?}", observed_addr); - swarm.add_external_address(observed_addr); - break; - } - } - - info!("Connected to the relay"); - - match opts.mode { - Mode::Dial => { - let remote_peer_id = connection - .blpop::<_, HashMap>("LISTEN_CLIENT_PEER_ID", 10) - .await? - .remove("LISTEN_CLIENT_PEER_ID") - .expect("key that we asked for to be present") - .parse()?; - - swarm.dial( - relay_address - .with(Protocol::P2pCircuit) - .with(Protocol::P2p(remote_peer_id)), - )?; - } - Mode::Listen => { - swarm.listen_on(relay_address.with(Protocol::P2pCircuit))?; - } - } - - loop { - match swarm.next().await.unwrap() { - SwarmEvent::Behaviour(BehaviourEvent::RelayClient( - relay::client::Event::ReservationReqAccepted { .. }, - )) => { - assert!(opts.mode == Mode::Listen); - info!("Relay accepted our reservation request."); - - connection - .rpush("LISTEN_CLIENT_PEER_ID", swarm.local_peer_id().to_string()) - .await?; - } - SwarmEvent::Behaviour(BehaviourEvent::RelayClient(event)) => { - info!("{:?}", event) - } - SwarmEvent::Behaviour(BehaviourEvent::Dcutr( - dcutr::Event::DirectConnectionUpgradeSucceeded { remote_peer_id }, - )) => { - info!("Successfully hole-punched to {remote_peer_id}"); - return Ok(()); - } - SwarmEvent::Behaviour(BehaviourEvent::Identify(_)) => {} - SwarmEvent::Behaviour(BehaviourEvent::Ping(_)) => {} - SwarmEvent::ConnectionEstablished { peer_id, .. } => { - info!("Connected to {peer_id}"); - } - SwarmEvent::OutgoingConnectionError { error, .. } => { - anyhow::bail!(error) - } - _ => {} - } - } -} - -fn make_swarm() -> Result> { - let local_key = identity::Keypair::generate_ed25519(); - let local_peer_id = PeerId::from(local_key.public()); - info!("Local peer id: {local_peer_id}"); - - let (relay_transport, client) = relay::client::new(local_peer_id); - - let transport = { - let relay_tcp_quic_transport = relay_transport - .or_transport(tcp::tokio::Transport::new( - tcp::Config::default().port_reuse(true), - )) - .upgrade(upgrade::Version::V1) - .authenticate(noise::Config::new(&local_key)?) - .multiplex(yamux::Config::default()) - .or_transport(quic::tokio::Transport::new(quic::Config::new(&local_key))); - - relay_tcp_quic_transport - .map(|either_output, _| match either_output { - Either::Left((peer_id, muxer)) => (peer_id, StreamMuxerBox::new(muxer)), - Either::Right((peer_id, muxer)) => (peer_id, StreamMuxerBox::new(muxer)), - }) - .boxed() - }; - - let behaviour = Behaviour { - relay_client: client, - ping: ping::Behaviour::new(ping::Config::new()), - identify: identify::Behaviour::new(identify::Config::new( - "/TODO/0.0.1".to_string(), - local_key.public(), - )), - dcutr: dcutr::Behaviour::new(local_peer_id), - }; - - Ok(SwarmBuilder::with_tokio_executor(transport, behaviour, local_peer_id).build()) -} - -#[derive(Clone, Debug, PartialEq, Parser)] -enum Mode { - Dial, - Listen, -} - -impl FromStr for Mode { - type Err = String; - fn from_str(mode: &str) -> Result { - match mode { - "dial" => Ok(Mode::Dial), - "listen" => Ok(Mode::Listen), - _ => Err("Expected either 'dial' or 'listen'".to_string()), - } - } -} - -#[derive(Clone, Debug, PartialEq, Parser)] -enum TransportProtocol { - Tcp, - Quic, -} - -impl FromStr for TransportProtocol { - type Err = String; - fn from_str(mode: &str) -> Result { - match mode { - "tcp" => Ok(TransportProtocol::Tcp), - "quic" => Ok(TransportProtocol::Quic), - _ => Err("Expected either 'tcp' or 'quic'".to_string()), - } - } -} - -#[derive(NetworkBehaviour)] -struct Behaviour { - relay_client: relay::client::Behaviour, - ping: ping::Behaviour, - identify: identify::Behaviour, - dcutr: dcutr::Behaviour, -} diff --git a/hole-punching-tests/src/bin/relay.rs b/hole-punching-tests/src/bin/relay.rs deleted file mode 100644 index 0823d016de7..00000000000 --- a/hole-punching-tests/src/bin/relay.rs +++ /dev/null @@ -1,142 +0,0 @@ -// Copyright 2020 Parity Technologies (UK) Ltd. -// Copyright 2021 Protocol Labs. -// -// Permission is hereby granted, free of charge, to any person obtaining a -// copy of this software and associated documentation files (the "Software"), -// to deal in the Software without restriction, including without limitation -// the rights to use, copy, modify, merge, publish, distribute, sublicense, -// and/or sell copies of the Software, and to permit persons to whom the -// Software is furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS -// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING -// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER -// DEALINGS IN THE SOFTWARE. - -use anyhow::Context; -use clap::Parser; -use futures::future::Either; -use futures::stream::StreamExt; -use libp2p::{ - core::multiaddr::Protocol, - core::muxing::StreamMuxerBox, - core::upgrade, - core::{Multiaddr, Transport}, - identify, identity, - identity::PeerId, - noise, ping, quic, relay, - swarm::{NetworkBehaviour, SwarmBuilder, SwarmEvent}, - tcp, yamux, -}; -use log::{info, LevelFilter}; -use redis::AsyncCommands; -use std::error::Error; -use std::net::IpAddr; - -#[tokio::main] -async fn main() -> Result<(), Box> { - env_logger::builder() - .filter_level(LevelFilter::Info) - .parse_default_env() - .init(); - - let opt = Opt::parse(); - - let local_key = identity::Keypair::generate_ed25519(); - let local_peer_id = PeerId::from(local_key.public()); - info!("Local peer id: {local_peer_id}"); - - let tcp_transport = tcp::tokio::Transport::default() - .upgrade(upgrade::Version::V1Lazy) - .authenticate( - noise::Config::new(&local_key).expect("Signing libp2p-noise static DH keypair failed."), - ) - .multiplex(yamux::Config::default()); - - let transport = quic::tokio::Transport::new(quic::Config::new(&local_key)) - .or_transport(tcp_transport) - .map(|either_output, _| match either_output { - Either::Left((peer_id, muxer)) => (peer_id, StreamMuxerBox::new(muxer)), - Either::Right((peer_id, muxer)) => (peer_id, StreamMuxerBox::new(muxer)), - }) - .boxed(); - - let behaviour = Behaviour { - relay: relay::Behaviour::new(local_peer_id, Default::default()), - ping: ping::Behaviour::new(ping::Config::new()), - identify: identify::Behaviour::new(identify::Config::new( - "/TODO/0.0.1".to_string(), - local_key.public(), - )), - }; - - let mut swarm = SwarmBuilder::with_tokio_executor(transport, behaviour, local_peer_id).build(); - - // Listen on all interfaces - let listen_addr_tcp = Multiaddr::empty() - .with(opt.listen_addr.into()) - .with(Protocol::Tcp(0)); - let tcp_listener_id = swarm.listen_on(listen_addr_tcp)?; - - let listen_addr_quic = Multiaddr::empty() - .with(opt.listen_addr.into()) - .with(Protocol::Udp(0)) - .with(Protocol::QuicV1); - let quic_listener_id = swarm.listen_on(listen_addr_quic)?; - - let client = redis::Client::open("redis://redis:6379/")?; - let mut connection = client - .get_async_connection() - .await - .context("Failed to connect to redis server")?; - - loop { - match swarm.next().await.expect("Infinite Stream.") { - SwarmEvent::NewListenAddr { - address, - listener_id, - } => { - swarm.add_external_address(address.clone()); // We know that in our testing network setup, that we are listening on a "publicly-reachable" address. - - info!("Listening on {address}"); - - let address = address - .with(Protocol::P2p(*swarm.local_peer_id())) - .to_string(); - - // Push each address twice because we need to connect two clients. - - if listener_id == tcp_listener_id { - connection.rpush("RELAY_TCP_ADDRESS", &address).await?; - connection.rpush("RELAY_TCP_ADDRESS", &address).await?; - } - if listener_id == quic_listener_id { - connection.rpush("RELAY_QUIC_ADDRESS", &address).await?; - connection.rpush("RELAY_QUIC_ADDRESS", &address).await?; - } - } - _ => {} - } - } -} - -#[derive(NetworkBehaviour)] -struct Behaviour { - relay: relay::Behaviour, - ping: ping::Behaviour, - identify: identify::Behaviour, -} - -#[derive(Debug, Parser)] -#[clap(name = "libp2p relay")] -struct Opt { - /// Which local address to listen on - #[clap(long)] - listen_addr: IpAddr, -} diff --git a/hole-punching-tests/src/main.rs b/hole-punching-tests/src/main.rs new file mode 100644 index 00000000000..cc685d563f4 --- /dev/null +++ b/hole-punching-tests/src/main.rs @@ -0,0 +1,402 @@ +// Copyright 2021 Protocol Labs. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use anyhow::{Context, Result}; +use clap::{Parser, Subcommand}; +use futures::{future::Either, stream::StreamExt}; +use libp2p::{ + core::{ + multiaddr::{Multiaddr, Protocol}, + muxing::StreamMuxerBox, + transport::Transport, + upgrade, + }, + dcutr, identify, identity, noise, quic, relay, + swarm::{NetworkBehaviour, SwarmBuilder, SwarmEvent}, + tcp, yamux, PeerId, Swarm, +}; +use redis::AsyncCommands; +use std::collections::HashMap; +use std::net::{IpAddr, Ipv4Addr}; +use std::str::FromStr; + +#[derive(Debug, Parser)] +struct Opts { + #[command(subcommand)] + command: Command, +} + +#[derive(Debug, Subcommand)] +enum Command { + Relay { + /// Which local address to listen on. + #[clap(env)] + listen_addr: IpAddr, + }, + Dial { + /// The transport (tcp or quic). + #[clap(env)] + transport: TransportProtocol, + }, + Listen { + /// The transport (tcp or quic). + #[clap(env)] + transport: TransportProtocol, + }, +} + +/// The redis key we push the relay's TCP listen address to. +const RELAY_TCP_ADDRESS: &str = "RELAY_TCP_ADDRESS"; +/// The redis key we push the relay's QUIC listen address to. +const RELAY_QUIC_ADDRESS: &str = "RELAY_QUIC_ADDRESS"; +/// The redis key we push the listen client's PeerId to. +const LISTEN_CLIENT_PEER_ID: &str = "LISTEN_CLIENT_PEER_ID"; + +#[tokio::main] +async fn main() -> Result<()> { + env_logger::builder() + .filter_level(log::LevelFilter::Info) + .parse_default_env() + .init(); + + let opts = Opts::parse(); + + let mut redis = RedisClient::new("redis", 6379).await?; + + match opts.command { + Command::Relay { listen_addr } => { + let mut swarm = make_relay_swarm()?; + + let tcp_listener_id = swarm.listen_on(tcp_addr(listen_addr))?; + let quic_listener_id = swarm.listen_on(quic_addr(listen_addr))?; + + loop { + if let SwarmEvent::NewListenAddr { + address, + listener_id, + } = swarm.next().await.expect("Infinite Stream.") + { + swarm.add_external_address(address.clone()); // We know that in our testing network setup, that we are listening on a "publicly-reachable" address. + + log::info!("Listening on {address}"); + + let address = address + .with(Protocol::P2p(*swarm.local_peer_id())) + .to_string(); + + // Push each address twice because we need to connect two clients. + + if listener_id == tcp_listener_id { + redis.push(RELAY_TCP_ADDRESS, &address).await?; + redis.push(RELAY_TCP_ADDRESS, &address).await?; + } + if listener_id == quic_listener_id { + redis.push(RELAY_QUIC_ADDRESS, &address).await?; + redis.push(RELAY_QUIC_ADDRESS, &address).await?; + } + } + } + } + Command::Dial { transport } => { + let relay_addr = match transport { + TransportProtocol::Tcp => redis.pop::(RELAY_TCP_ADDRESS).await?, + TransportProtocol::Quic => redis.pop::(RELAY_TCP_ADDRESS).await?, + }; + + let mut swarm = make_client_swarm()?; + client_listen_on_transport(&mut swarm, transport).await?; + client_connect_to_relay(&mut swarm, relay_addr.clone()).await?; + + let remote_peer_id = redis.pop(LISTEN_CLIENT_PEER_ID).await?; + + swarm.dial( + relay_addr + .with(Protocol::P2pCircuit) + .with(Protocol::P2p(remote_peer_id)), + )?; + + loop { + match swarm.next().await.unwrap() { + SwarmEvent::Behaviour(ClientBehaviourEvent::Dcutr( + dcutr::Event::DirectConnectionUpgradeSucceeded { remote_peer_id }, + )) => { + log::info!("Successfully hole-punched to {remote_peer_id}"); + return Ok(()); + } + SwarmEvent::Behaviour(ClientBehaviourEvent::Dcutr( + dcutr::Event::DirectConnectionUpgradeFailed { + remote_peer_id, + error, + }, + )) => { + log::info!("Failed to hole-punched to {remote_peer_id}"); + return Err(anyhow::Error::new(error)); + } + SwarmEvent::OutgoingConnectionError { error, .. } => { + anyhow::bail!(error) + } + _ => {} + } + } + } + Command::Listen { transport } => { + let relay_addr = match transport { + TransportProtocol::Tcp => redis.pop::(RELAY_TCP_ADDRESS).await?, + TransportProtocol::Quic => redis.pop::(RELAY_TCP_ADDRESS).await?, + }; + + let mut swarm = make_client_swarm()?; + client_listen_on_transport(&mut swarm, transport).await?; + client_connect_to_relay(&mut swarm, relay_addr.clone()).await?; + + swarm.listen_on(relay_addr.with(Protocol::P2pCircuit))?; + + loop { + match swarm.next().await.unwrap() { + SwarmEvent::Behaviour(ClientBehaviourEvent::RelayClient( + relay::client::Event::ReservationReqAccepted { .. }, + )) => { + log::info!("Relay accepted our reservation request."); + + redis + .push(LISTEN_CLIENT_PEER_ID, swarm.local_peer_id()) + .await?; + } + SwarmEvent::Behaviour(ClientBehaviourEvent::Dcutr( + dcutr::Event::DirectConnectionUpgradeSucceeded { remote_peer_id }, + )) => { + log::info!("Successfully hole-punched to {remote_peer_id}"); + return Ok(()); + } + SwarmEvent::Behaviour(ClientBehaviourEvent::Dcutr( + dcutr::Event::DirectConnectionUpgradeFailed { + remote_peer_id, + error, + }, + )) => { + log::info!("Failed to hole-punched to {remote_peer_id}"); + return Err(anyhow::Error::new(error)); + } + SwarmEvent::OutgoingConnectionError { error, .. } => { + anyhow::bail!(error) + } + _ => {} + } + } + } + } +} + +async fn client_connect_to_relay( + swarm: &mut Swarm, + relay_addr: Multiaddr, +) -> Result<()> { + // Connect to the relay server. + swarm.dial(relay_addr.clone())?; + + loop { + if let SwarmEvent::Behaviour(ClientBehaviourEvent::Identify(identify::Event::Received { + info: identify::Info { observed_addr, .. }, + .. + })) = swarm.next().await.unwrap() + { + log::info!("Relay told us our public address: {observed_addr}"); + swarm.add_external_address(observed_addr); + break; + } + } + + log::info!("Connected to the relay"); + Ok(()) +} + +async fn client_listen_on_transport( + swarm: &mut Swarm, + transport: TransportProtocol, +) -> Result<()> { + let listen_addr = match transport { + TransportProtocol::Tcp => tcp_addr(Ipv4Addr::UNSPECIFIED.into()), + TransportProtocol::Quic => quic_addr(Ipv4Addr::UNSPECIFIED.into()), + }; + let expected_listener_id = swarm + .listen_on(listen_addr) + .context("Failed to listen on address")?; + + let mut listen_addresses = 0; + + // We should have at least two listen addresses, one for localhost and the actual interface. + while listen_addresses < 2 { + if let SwarmEvent::NewListenAddr { + listener_id, + address, + } = swarm.next().await.unwrap() + { + if listener_id == expected_listener_id { + listen_addresses += 1; + } + + log::info!("Listening on {address}"); + } + } + Ok(()) +} + +fn tcp_addr(addr: IpAddr) -> Multiaddr { + Multiaddr::empty().with(addr.into()).with(Protocol::Tcp(0)) +} + +fn quic_addr(addr: IpAddr) -> Multiaddr { + Multiaddr::empty() + .with(addr.into()) + .with(Protocol::Udp(0)) + .with(Protocol::QuicV1) +} + +fn make_client_swarm() -> Result> { + let local_key = identity::Keypair::generate_ed25519(); + let local_peer_id = PeerId::from(local_key.public()); + log::info!("Local peer id: {local_peer_id}"); + + let (relay_transport, client) = relay::client::new(local_peer_id); + + let transport = { + let relay_tcp_quic_transport = relay_transport + .or_transport(tcp::tokio::Transport::new( + tcp::Config::default().port_reuse(true), + )) + .upgrade(upgrade::Version::V1) + .authenticate(noise::Config::new(&local_key)?) + .multiplex(yamux::Config::default()) + .or_transport(quic::tokio::Transport::new(quic::Config::new(&local_key))); + + relay_tcp_quic_transport + .map(|either_output, _| match either_output { + Either::Left((peer_id, muxer)) => (peer_id, StreamMuxerBox::new(muxer)), + Either::Right((peer_id, muxer)) => (peer_id, StreamMuxerBox::new(muxer)), + }) + .boxed() + }; + + let behaviour = ClientBehaviour { + relay_client: client, + identify: identify::Behaviour::new(identify::Config::new( + "/hole-punch-tests/1".to_owned(), + local_key.public(), + )), + dcutr: dcutr::Behaviour::new(local_peer_id), + }; + + Ok(SwarmBuilder::with_tokio_executor(transport, behaviour, local_peer_id).build()) +} + +fn make_relay_swarm() -> Result> { + let local_key = identity::Keypair::generate_ed25519(); + let local_peer_id = PeerId::from(local_key.public()); + log::info!("Local peer id: {local_peer_id}"); + + let transport = tcp::tokio::Transport::new(tcp::Config::default().port_reuse(true)) + .upgrade(upgrade::Version::V1) + .authenticate(noise::Config::new(&local_key)?) + .multiplex(yamux::Config::default()) + .or_transport(quic::tokio::Transport::new(quic::Config::new(&local_key))) + .map(|either_output, _| match either_output { + Either::Left((peer_id, muxer)) => (peer_id, StreamMuxerBox::new(muxer)), + Either::Right((peer_id, muxer)) => (peer_id, StreamMuxerBox::new(muxer)), + }) + .boxed(); + let behaviour = RelayBehaviour { + relay: relay::Behaviour::new(local_peer_id, relay::Config::default()), + identify: identify::Behaviour::new(identify::Config::new( + "/hole-punch-tests/1".to_owned(), + local_key.public(), + )), + }; + + Ok(SwarmBuilder::with_tokio_executor(transport, behaviour, local_peer_id).build()) +} + +struct RedisClient { + inner: redis::aio::Connection, +} + +impl RedisClient { + async fn new(host: &str, port: u16) -> Result { + let client = redis::Client::open(format!("redis://{host}:{port}/")) + .context("Bad redis server URL")?; + let connection = client + .get_async_connection() + .await + .context("Failed to connect to redis server")?; + + Ok(Self { inner: connection }) + } + + async fn push(&mut self, key: &str, value: impl ToString) -> Result<()> { + self.inner.rpush(key, value.to_string()).await?; + + Ok(()) + } + + async fn pop(&mut self, key: &str) -> Result + where + V: FromStr, + V::Err: std::error::Error + Send + Sync + 'static, + { + let value = self + .inner + .blpop::<_, HashMap>(key, 10) + .await? + .remove(key) + .expect("key that we asked for to be present") + .parse()?; + + Ok(value) + } +} + +#[derive(Clone, Copy, Debug, PartialEq, Parser)] +enum TransportProtocol { + Tcp, + Quic, +} + +impl FromStr for TransportProtocol { + type Err = String; + fn from_str(mode: &str) -> Result { + match mode { + "tcp" => Ok(TransportProtocol::Tcp), + "quic" => Ok(TransportProtocol::Quic), + _ => Err("Expected either 'tcp' or 'quic'".to_string()), + } + } +} + +#[derive(NetworkBehaviour)] +struct ClientBehaviour { + relay_client: relay::client::Behaviour, + identify: identify::Behaviour, + dcutr: dcutr::Behaviour, +} + +#[derive(NetworkBehaviour)] +struct RelayBehaviour { + relay: relay::Behaviour, + identify: identify::Behaviour, +}