Skip to content

Commit

Permalink
Merge pull request #19 from elenaf9/quic/multiple-endpoints-2
Browse files Browse the repository at this point in the history
transports/quic: Refactor listener handling (updated)
  • Loading branch information
kpp authored Aug 27, 2022
2 parents 8aff243 + bf06d96 commit e0684ab
Show file tree
Hide file tree
Showing 118 changed files with 3,239 additions and 2,531 deletions.
12 changes: 6 additions & 6 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ jobs:
steps:

- name: Cancel Previous Runs
uses: styfle/cancel-workflow-action@a40b8845c0683271d9f53dfcb887a7e181d3918b # 0.9.1
uses: styfle/cancel-workflow-action@bb6001c4ea612bf59c3abfc4756fbceee4f870c7 # 0.10.0
with:
access_token: ${{ github.token }}

Expand Down Expand Up @@ -52,7 +52,7 @@ jobs:
steps:

- name: Cancel Previous Runs
uses: styfle/cancel-workflow-action@a40b8845c0683271d9f53dfcb887a7e181d3918b # 0.9.1
uses: styfle/cancel-workflow-action@bb6001c4ea612bf59c3abfc4756fbceee4f870c7 # 0.10.0
with:
access_token: ${{ github.token }}

Expand Down Expand Up @@ -87,7 +87,7 @@ jobs:
steps:

- name: Cancel Previous Runs
uses: styfle/cancel-workflow-action@a40b8845c0683271d9f53dfcb887a7e181d3918b # 0.9.1
uses: styfle/cancel-workflow-action@bb6001c4ea612bf59c3abfc4756fbceee4f870c7 # 0.10.0
with:
access_token: ${{ github.token }}

Expand All @@ -109,7 +109,7 @@ jobs:
steps:

- name: Cancel Previous Runs
uses: styfle/cancel-workflow-action@a40b8845c0683271d9f53dfcb887a7e181d3918b # 0.9.1
uses: styfle/cancel-workflow-action@bb6001c4ea612bf59c3abfc4756fbceee4f870c7 # 0.10.0
with:
access_token: ${{ github.token }}

Expand All @@ -135,7 +135,7 @@ jobs:
steps:

- name: Cancel Previous Runs
uses: styfle/cancel-workflow-action@a40b8845c0683271d9f53dfcb887a7e181d3918b # 0.9.1
uses: styfle/cancel-workflow-action@bb6001c4ea612bf59c3abfc4756fbceee4f870c7 # 0.10.0
with:
access_token: ${{ github.token }}

Expand All @@ -157,7 +157,7 @@ jobs:
steps:

- name: Cancel Previous Runs
uses: styfle/cancel-workflow-action@a40b8845c0683271d9f53dfcb887a7e181d3918b # 0.9.1
uses: styfle/cancel-workflow-action@bb6001c4ea612bf59c3abfc4756fbceee4f870c7 # 0.10.0
with:
access_token: ${{ github.token }}

Expand Down
30 changes: 29 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,35 @@

# `libp2p` facade crate

# 0.46.0 [unreleased]
# 0.47.0 [unreleased]

- Update to [`libp2p-dcutr` `v0.5.0`](protocols/dcutr/CHANGELOG.md#050).

- Update to [`libp2p-rendezvous` `v0.8.0`](protocols/rendezvous/CHANGELOG.md#080).

- Update to [`libp2p-ping` `v0.38.0`](protocols/ping/CHANGELOG.md#0380).

- Update to [`libp2p-identify` `v0.38.0`](protocols/identify/CHANGELOG.md#0380).

- Update to [`libp2p-floodsub` `v0.38.0`](protocols/floodsub/CHANGELOG.md#0380).

- Update to [`libp2p-relay` `v0.11.0`](protocols/relay/CHANGELOG.md#0110).

- Update to [`libp2p-metrics` `v0.8.0`](misc/metrics/CHANGELOG.md#080).

- Update to [`libp2p-kad` `v0.39.0`](protocols/kad/CHANGELOG.md#0390).

- Update to [`libp2p-autonat` `v0.6.0`](protocols/autonat/CHANGELOG.md#060).

- Update to [`libp2p-request-response` `v0.20.0`](protocols/request-response/CHANGELOG.md#0200).

- Update to [`libp2p-swarm` `v0.38.0`](swarm/CHANGELOG.md#0380).

# 0.46.1

- Update to `libp2p-derive` [`v0.28.0`](swarm-derive/CHANGELOG.md#0280).

# 0.46.0

- Semver bump Rust from `1.56.1` to `1.60.0` . See [PR 2646].
- Added weak dependencies for features. See [PR 2646].
Expand Down
29 changes: 15 additions & 14 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ default = [
"websocket",
"yamux",
]

autonat = ["dep:libp2p-autonat"]
dcutr = ["dep:libp2p-dcutr", "libp2p-metrics?/dcutr"]
deflate = ["dep:libp2p-deflate"]
Expand Down Expand Up @@ -78,23 +79,23 @@ getrandom = "0.2.3" # Explicit dependency to be used in `wasm-bindgen` feature
instant = "0.1.11" # Explicit dependency to be used in `wasm-bindgen` feature
lazy_static = "1.2"

libp2p-autonat = { version = "0.5.0", path = "protocols/autonat", optional = true }
libp2p-autonat = { version = "0.6.0", path = "protocols/autonat", optional = true }
libp2p-core = { version = "0.34.0", path = "core", default-features = false }
libp2p-dcutr = { version = "0.4.0", path = "protocols/dcutr", optional = true }
libp2p-floodsub = { version = "0.37.0", path = "protocols/floodsub", optional = true }
libp2p-identify = { version = "0.37.0", path = "protocols/identify", optional = true }
libp2p-kad = { version = "0.38.0", path = "protocols/kad", optional = true }
libp2p-metrics = { version = "0.7.0", path = "misc/metrics", optional = true }
libp2p-dcutr = { version = "0.5.0", path = "protocols/dcutr", optional = true }
libp2p-floodsub = { version = "0.38.0", path = "protocols/floodsub", optional = true }
libp2p-identify = { version = "0.38.0", path = "protocols/identify", optional = true }
libp2p-kad = { version = "0.39.0", path = "protocols/kad", optional = true }
libp2p-metrics = { version = "0.8.0", path = "misc/metrics", optional = true }
libp2p-mplex = { version = "0.34.0", path = "muxers/mplex", optional = true }
libp2p-noise = { version = "0.37.0", path = "transports/noise", optional = true }
libp2p-ping = { version = "0.37.0", path = "protocols/ping", optional = true }
libp2p-ping = { version = "0.38.0", path = "protocols/ping", optional = true }
libp2p-plaintext = { version = "0.34.0", path = "transports/plaintext", optional = true }
libp2p-pnet = { version = "0.22.0", path = "transports/pnet", optional = true }
libp2p-relay = { version = "0.10.0", path = "protocols/relay", optional = true }
libp2p-rendezvous = { version = "0.7.0", path = "protocols/rendezvous", optional = true }
libp2p-request-response = { version = "0.19.0", path = "protocols/request-response", optional = true }
libp2p-swarm = { version = "0.37.0", path = "swarm" }
libp2p-swarm-derive = { version = "0.27.0", path = "swarm-derive" }
libp2p-relay = { version = "0.11.0", path = "protocols/relay", optional = true }
libp2p-rendezvous = { version = "0.8.0", path = "protocols/rendezvous", optional = true }
libp2p-request-response = { version = "0.20.0", path = "protocols/request-response", optional = true }
libp2p-swarm = { version = "0.38.0", path = "swarm" }
libp2p-swarm-derive = { version = "0.28.0", path = "swarm-derive" }
libp2p-uds = { version = "0.33.0", path = "transports/uds", optional = true }
libp2p-wasm-ext = { version = "0.34.0", path = "transports/wasm-ext", default-features = false, optional = true }
libp2p-yamux = { version = "0.38.0", path = "muxers/yamux", optional = true }
Expand All @@ -107,13 +108,13 @@ smallvec = "1.6.1"
[target.'cfg(not(any(target_os = "emscripten", target_os = "wasi", target_os = "unknown")))'.dependencies]
libp2p-deflate = { version = "0.34.0", path = "transports/deflate", optional = true }
libp2p-dns = { version = "0.34.0", path = "transports/dns", optional = true, default-features = false }
libp2p-mdns = { version = "0.38.0", path = "protocols/mdns", optional = true }
libp2p-mdns = { version = "0.39.0", path = "protocols/mdns", optional = true }
libp2p-quic = { version = "0.7.0", path = "transports/quic", optional = true }
libp2p-tcp = { version = "0.34.0", path = "transports/tcp", default-features = false, optional = true }
libp2p-websocket = { version = "0.36.0", path = "transports/websocket", optional = true }

[target.'cfg(not(target_os = "unknown"))'.dependencies]
libp2p-gossipsub = { version = "0.39.0", path = "protocols/gossipsub", optional = true }
libp2p-gossipsub = { version = "0.40.0", path = "protocols/gossipsub", optional = true }

[dev-dependencies]
async-std = { version = "1.6.2", features = ["attributes"] }
Expand Down
9 changes: 8 additions & 1 deletion core/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,11 +1,18 @@
# 0.34.0 - unreleased
# 0.34.0

- Introduce `StreamMuxerEvent::map_inbound_stream`. See [PR 2691].
- Remove `{read,write,flush,shutdown,destroy}_substream` functions from `StreamMuxer` trait
in favor of forcing `StreamMuxer::Substream` to implement `AsyncRead + AsyncWrite`. See [PR 2707].
- Replace `Into<std::io::Error>` bound on `StreamMuxer::Error` with `std::error::Error`. See [PR 2710].

- Remove the concept of individual `Transport::Listener` streams from `Transport`.
Instead the `Transport` is polled directly via `Transport::poll`. The
`Transport` is now responsible for driving its listeners. See [PR 2652].

[PR 2691]: https://github.com/libp2p/rust-libp2p/pull/2691
[PR 2707]: https://github.com/libp2p/rust-libp2p/pull/2707
[PR 2710]: https://github.com/libp2p/rust-libp2p/pull/2710
[PR 2652]: https://github.com/libp2p/rust-libp2p/pull/2652

# 0.33.0

Expand Down
19 changes: 0 additions & 19 deletions core/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,25 +43,6 @@ impl std::ops::Add<usize> for ConnectionId {
}
}

/// The ID of a single listener.
#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash, PartialOrd, Ord)]
pub struct ListenerId(u64);

impl ListenerId {
/// Creates a `ListenerId` from a non-negative integer.
pub fn new(id: u64) -> Self {
Self(id)
}
}

impl std::ops::Add<u64> for ListenerId {
type Output = Self;

fn add(self, other: u64) -> Self {
Self(self.0 + other)
}
}

/// The endpoint roles associated with a peer-to-peer communication channel.
#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
pub enum Endpoint {
Expand Down
122 changes: 53 additions & 69 deletions core/src/either.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

use crate::{
muxing::{StreamMuxer, StreamMuxerEvent},
transport::{ListenerEvent, Transport, TransportError},
transport::{ListenerId, Transport, TransportError, TransportEvent},
Multiaddr, ProtocolName,
};
use futures::{
Expand Down Expand Up @@ -203,7 +203,7 @@ where
{
type Substream = EitherOutput<A::Substream, B::Substream>;
type OutboundSubstream = EitherOutbound<A, B>;
type Error = io::Error;
type Error = EitherError<A::Error, B::Error>;

fn poll_event(
&self,
Expand All @@ -212,11 +212,11 @@ where
match self {
EitherOutput::First(inner) => inner
.poll_event(cx)
.map_err(|e| e.into())
.map_err(EitherError::A)
.map_ok(|event| event.map_inbound_stream(EitherOutput::First)),
EitherOutput::Second(inner) => inner
.poll_event(cx)
.map_err(|e| e.into())
.map_err(EitherError::B)
.map_ok(|event| event.map_inbound_stream(EitherOutput::Second)),
}
}
Expand All @@ -237,11 +237,11 @@ where
(EitherOutput::First(ref inner), EitherOutbound::A(ref mut substream)) => inner
.poll_outbound(cx, substream)
.map(|p| p.map(EitherOutput::First))
.map_err(|e| e.into()),
.map_err(EitherError::A),
(EitherOutput::Second(ref inner), EitherOutbound::B(ref mut substream)) => inner
.poll_outbound(cx, substream)
.map(|p| p.map(EitherOutput::Second))
.map_err(|e| e.into()),
.map_err(EitherError::B),
_ => panic!("Wrong API usage"),
}
}
Expand All @@ -261,8 +261,8 @@ where

fn poll_close(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
match self {
EitherOutput::First(inner) => inner.poll_close(cx).map_err(|e| e.into()),
EitherOutput::Second(inner) => inner.poll_close(cx).map_err(|e| e.into()),
EitherOutput::First(inner) => inner.poll_close(cx).map_err(EitherError::A),
EitherOutput::Second(inner) => inner.poll_close(cx).map_err(EitherError::B),
}
}
}
Expand All @@ -274,48 +274,6 @@ pub enum EitherOutbound<A: StreamMuxer, B: StreamMuxer> {
B(B::OutboundSubstream),
}

/// Implements `Stream` and dispatches all method calls to either `First` or `Second`.
#[pin_project(project = EitherListenStreamProj)]
#[derive(Debug, Copy, Clone)]
#[must_use = "futures do nothing unless polled"]
pub enum EitherListenStream<A, B> {
First(#[pin] A),
Second(#[pin] B),
}

impl<AStream, BStream, AInner, BInner, AError, BError> Stream
for EitherListenStream<AStream, BStream>
where
AStream: TryStream<Ok = ListenerEvent<AInner, AError>, Error = AError>,
BStream: TryStream<Ok = ListenerEvent<BInner, BError>, Error = BError>,
{
type Item = Result<
ListenerEvent<EitherFuture<AInner, BInner>, EitherError<AError, BError>>,
EitherError<AError, BError>,
>;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match self.project() {
EitherListenStreamProj::First(a) => match TryStream::try_poll_next(a, cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(None) => Poll::Ready(None),
Poll::Ready(Some(Ok(le))) => Poll::Ready(Some(Ok(le
.map(EitherFuture::First)
.map_err(EitherError::A)))),
Poll::Ready(Some(Err(err))) => Poll::Ready(Some(Err(EitherError::A(err)))),
},
EitherListenStreamProj::Second(a) => match TryStream::try_poll_next(a, cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(None) => Poll::Ready(None),
Poll::Ready(Some(Ok(le))) => Poll::Ready(Some(Ok(le
.map(EitherFuture::Second)
.map_err(EitherError::B)))),
Poll::Ready(Some(Err(err))) => Poll::Ready(Some(Err(EitherError::B(err)))),
},
}
}
}

/// Implements `Future` and dispatches all method calls to either `First` or `Second`.
#[pin_project(project = EitherFutureProj)]
#[derive(Debug, Copy, Clone)]
Expand Down Expand Up @@ -385,11 +343,12 @@ impl<A: ProtocolName, B: ProtocolName> ProtocolName for EitherName<A, B> {
}
}
}

#[derive(Debug, Copy, Clone)]
#[pin_project(project = EitherTransportProj)]
#[derive(Debug)]
#[must_use = "transports do nothing unless polled"]
pub enum EitherTransport<A, B> {
Left(A),
Right(B),
Left(#[pin] A),
Right(#[pin] B),
}

impl<A, B> Transport for EitherTransport<A, B>
Expand All @@ -399,29 +358,54 @@ where
{
type Output = EitherOutput<A::Output, B::Output>;
type Error = EitherError<A::Error, B::Error>;
type Listener = EitherListenStream<A::Listener, B::Listener>;
type ListenerUpgrade = EitherFuture<A::ListenerUpgrade, B::ListenerUpgrade>;
type Dial = EitherFuture<A::Dial, B::Dial>;

fn listen_on(
&mut self,
addr: Multiaddr,
) -> Result<Self::Listener, TransportError<Self::Error>> {
use TransportError::*;
match self {
EitherTransport::Left(a) => match a.listen_on(addr) {
Ok(listener) => Ok(EitherListenStream::First(listener)),
Err(MultiaddrNotSupported(addr)) => Err(MultiaddrNotSupported(addr)),
Err(Other(err)) => Err(Other(EitherError::A(err))),
fn poll(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<TransportEvent<Self::ListenerUpgrade, Self::Error>> {
match self.project() {
EitherTransportProj::Left(a) => match a.poll(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(event) => Poll::Ready(
event
.map_upgrade(EitherFuture::First)
.map_err(EitherError::A),
),
},
EitherTransport::Right(b) => match b.listen_on(addr) {
Ok(listener) => Ok(EitherListenStream::Second(listener)),
Err(MultiaddrNotSupported(addr)) => Err(MultiaddrNotSupported(addr)),
Err(Other(err)) => Err(Other(EitherError::B(err))),
EitherTransportProj::Right(b) => match b.poll(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(event) => Poll::Ready(
event
.map_upgrade(EitherFuture::Second)
.map_err(EitherError::B),
),
},
}
}

fn remove_listener(&mut self, id: ListenerId) -> bool {
match self {
EitherTransport::Left(t) => t.remove_listener(id),
EitherTransport::Right(t) => t.remove_listener(id),
}
}

fn listen_on(&mut self, addr: Multiaddr) -> Result<ListenerId, TransportError<Self::Error>> {
use TransportError::*;
match self {
EitherTransport::Left(a) => a.listen_on(addr).map_err(|e| match e {
MultiaddrNotSupported(addr) => MultiaddrNotSupported(addr),
Other(err) => Other(EitherError::A(err)),
}),
EitherTransport::Right(b) => b.listen_on(addr).map_err(|e| match e {
MultiaddrNotSupported(addr) => MultiaddrNotSupported(addr),
Other(err) => Other(EitherError::B(err)),
}),
}
}

fn dial(&mut self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> {
use TransportError::*;
match self {
Expand Down
Loading

0 comments on commit e0684ab

Please sign in to comment.