Skip to content

Commit

Permalink
Allow accept/reject/retry before handshake begins
Browse files Browse the repository at this point in the history
This commit removes use_retry from the server config and provides a
public API for the user to manually accept/reject/retry incoming
connections before a handshake begins, and inspect properties such as
an incoming connection's remote address and whether that address is
validated when doing so.

In quinn-proto, Incoming is made public, as well as Endpoint's accept/
reject/retry methods which operate on it. The
DatagramEvent::NewConnection event is modified to return an incoming
but not yet accepted connection.

In quinn, awaiting Endpoint::accept now yields a new
quinn::Incoming type, rather than quinn::Connecting. The new
quinn::Incoming type has all the methods its quinn_proto equivalent has,
as well as an accept method to (fallibly) transition it into a
Connecting, and also reject, retry, and ignore methods.

Furthermore, quinn::Incoming implements IntoFuture with the output type
Result<Connection, ConnectionError>>, which is the same as the Future
output type of Connecting. This lets server code which was
straightforwardly awaiting the result of quinn::Endpoint::accept work
with little to no modification.

The test accept_after_close was removed because the functionality it
was testing for no longer exists.
  • Loading branch information
gretchenfrage committed Mar 29, 2024
1 parent da4b818 commit 1ed2fb1
Show file tree
Hide file tree
Showing 11 changed files with 531 additions and 154 deletions.
2 changes: 1 addition & 1 deletion perf/src/bin/perf_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ async fn run(opt: Opt) -> Result<()> {
Ok(())
}

async fn handle(handshake: quinn::Connecting, opt: Arc<Opt>) -> Result<()> {
async fn handle(handshake: quinn::Incoming, opt: Arc<Opt>) -> Result<()> {
let connection = handshake.await.context("handshake failed")?;
debug!("{} connected", connection.remote_address());
tokio::try_join!(
Expand Down
14 changes: 0 additions & 14 deletions quinn-proto/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -741,10 +741,6 @@ pub struct ServerConfig {
/// Used to generate one-time AEAD keys to protect handshake tokens
pub(crate) token_key: Arc<dyn HandshakeTokenKey>,

/// Whether to require clients to prove ownership of an address before committing resources.
///
/// Introduces an additional round-trip to the handshake to make denial of service attacks more difficult.
pub(crate) use_retry: bool,
/// Microseconds after a stateless retry token was issued for which it's considered valid.
pub(crate) retry_token_lifetime: Duration,

Expand All @@ -766,7 +762,6 @@ impl ServerConfig {
crypto,

token_key,
use_retry: false,
retry_token_lifetime: Duration::from_secs(15),

migration: true,
Expand All @@ -785,14 +780,6 @@ impl ServerConfig {
self
}

/// Whether to require clients to prove ownership of an address before committing resources.
///
/// Introduces an additional round-trip to the handshake to make denial of service attacks more difficult.
pub fn use_retry(&mut self, value: bool) -> &mut Self {
self.use_retry = value;
self
}

/// Duration after a stateless retry token was issued for which it's considered valid.
pub fn retry_token_lifetime(&mut self, value: Duration) -> &mut Self {
self.retry_token_lifetime = value;
Expand Down Expand Up @@ -844,7 +831,6 @@ impl fmt::Debug for ServerConfig {
.field("transport", &self.transport)
.field("crypto", &"ServerConfig { elided }")
.field("token_key", &"[ elided ]")
.field("use_retry", &self.use_retry)
.field("retry_token_lifetime", &self.retry_token_lifetime)
.field("migration", &self.migration)
.finish()
Expand Down
106 changes: 84 additions & 22 deletions quinn-proto/src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ impl Endpoint {

return match first_decode.finish(Some(&*crypto.header.remote)) {
Ok(packet) => {
self.handle_first_packet(now, addresses, ecn, packet, remaining, crypto, buf)
self.handle_first_packet(addresses, ecn, packet, remaining, crypto, buf)
}
Err(e) => {
trace!("unable to decode initial packet: {}", e);
Expand Down Expand Up @@ -412,7 +412,6 @@ impl Endpoint {

fn handle_first_packet(
&mut self,
now: Instant,
addresses: FourTuple,
ecn: Option<EcnCodepoint>,
mut packet: Packet,
Expand Down Expand Up @@ -478,7 +477,7 @@ impl Endpoint {
}
};

let incoming = Incoming {
Some(DatagramEvent::NewConnection(Incoming {
addresses,
ecn,
packet,
Expand All @@ -490,24 +489,31 @@ impl Endpoint {
version,
retry_src_cid,
orig_dst_cid,
};
if server_config.use_retry && !incoming.remote_address_validated() {
Some(DatagramEvent::Response(self.retry(incoming, buf)))
} else {
match self.accept(incoming, now, buf) {
Ok((ch, conn)) => Some(DatagramEvent::NewConnection(ch, conn)),
Err((_, response)) => response.map(DatagramEvent::Response),
}
}
}))
}

/// Attempt to accept this incoming connection (an error may still occur)
fn accept(
pub fn accept(
&mut self,
incoming: Incoming,
now: Instant,
buf: &mut BytesMut,
) -> Result<(ConnectionHandle, Connection), (ConnectionError, Option<Transmit>)> {
) -> Result<(ConnectionHandle, Connection), AcceptError> {
if self.cids_exhausted() {
debug!("refusing connection");
return Err(AcceptError {
cause: ConnectionError::CidsExhausted,
response: Some(self.initial_close(
incoming.version,
incoming.addresses,
&incoming.crypto,
&incoming.src_cid,
TransportError::CONNECTION_REFUSED(""),
buf,
)),
});
}

let server_config = self.server_config.as_ref().unwrap().clone();

let ch = ConnectionHandle(self.connections.vacant_key());
Expand Down Expand Up @@ -570,7 +576,7 @@ impl Endpoint {
)),
_ => None,
};
Err((e, response))
Err(AcceptError { cause: e, response })
}
}
}
Expand Down Expand Up @@ -604,8 +610,29 @@ impl Endpoint {
Ok(())
}

/// Reject this incoming connection attempt
pub fn reject(&mut self, incoming: Incoming, buf: &mut BytesMut) -> Transmit {
self.initial_close(
incoming.version,
incoming.addresses,
&incoming.crypto,
&incoming.src_cid,
TransportError::CONNECTION_REFUSED(""),
buf,
)
}

/// Respond with a retry packet, requiring the client to retry with address validation
fn retry(&mut self, incoming: Incoming, buf: &mut BytesMut) -> Transmit {
///
/// Errors if `incoming.remote_address_validated()` is true.
pub fn retry(
&mut self,
incoming: Incoming,
buf: &mut BytesMut,
) -> Result<Transmit, RetryError> {
if incoming.remote_address_validated() {
return Err(RetryError(incoming));
}
let server_config = self.server_config.as_ref().unwrap();

// First Initial
Expand Down Expand Up @@ -644,13 +671,13 @@ impl Endpoint {
));
encode.finish(buf, &*incoming.crypto.header.local, None);

Transmit {
Ok(Transmit {
destination: incoming.addresses.remote,
ecn: None,
size: buf.len(),
segment_size: None,
src_ip: incoming.addresses.local_ip,
}
})
}

fn add_connection(
Expand Down Expand Up @@ -925,14 +952,14 @@ impl IndexMut<ConnectionHandle> for Slab<ConnectionMeta> {
pub enum DatagramEvent {
/// The datagram is redirected to its `Connection`
ConnectionEvent(ConnectionHandle, ConnectionEvent),
/// The datagram has resulted in starting a new `Connection`
NewConnection(ConnectionHandle, Connection),
/// The datagram may result in starting a new `Connection`
NewConnection(Incoming),
/// Response generated directly by the endpoint
Response(Transmit),
}

/// An incoming connection for which the server has not yet begun its part of the handshake.
struct Incoming {
pub struct Incoming {
addresses: FourTuple,
ecn: Option<EcnCodepoint>,
packet: Packet,
Expand All @@ -947,11 +974,24 @@ struct Incoming {
}

impl Incoming {
/// The local IP address which was used when the peer established
/// the connection
///
/// This has the same behavior as [`Connection::local_ip`]
pub fn local_ip(&self) -> Option<IpAddr> {
self.addresses.local_ip
}

/// The peer's UDP address.
pub fn remote_address(&self) -> SocketAddr {
self.addresses.remote
}

/// Whether the socket address that is initiating this connection has been validated.
///
/// This means that the sender of the initial packet has proved that they can receive traffic
/// sent to `self.remote_address()`.
fn remote_address_validated(&self) -> bool {
pub fn remote_address_validated(&self) -> bool {
self.retry_src_cid.is_some()
}
}
Expand Down Expand Up @@ -1006,6 +1046,28 @@ pub enum ConnectError {
UnsupportedVersion,
}

/// Error type for attempting to accept an [`Incoming`]
#[derive(Debug)]
pub struct AcceptError {
/// Underlying error describing reason for failure
pub cause: ConnectionError,
/// Optional response to transmit back
pub response: Option<Transmit>,
}

/// Error for attempting to retry an [`Incoming`] which already bears an address
/// validation token from a previous retry
#[derive(Debug, Error)]
#[error("retry() with validated Incoming")]
pub struct RetryError(Incoming);

impl RetryError {
/// Get the [`Incoming`]
pub fn into_incoming(self) -> Incoming {
self.0
}
}

/// Reset Tokens which are associated with peer socket addresses
///
/// The standard `HashMap` is used since both `SocketAddr` and `ResetToken` are
Expand Down
4 changes: 3 additions & 1 deletion quinn-proto/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,9 @@ use crate::frame::Frame;
pub use crate::frame::{ApplicationClose, ConnectionClose, Datagram};

mod endpoint;
pub use crate::endpoint::{ConnectError, ConnectionHandle, DatagramEvent, Endpoint};
pub use crate::endpoint::{
AcceptError, ConnectError, ConnectionHandle, DatagramEvent, Endpoint, Incoming, RetryError,
};

mod shared;
pub use crate::shared::{ConnectionEvent, ConnectionId, EcnCodepoint, EndpointEvent};
Expand Down
40 changes: 25 additions & 15 deletions quinn-proto/src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,13 +165,8 @@ fn draft_version_compat() {
#[test]
fn stateless_retry() {
let _guard = subscribe();
let mut pair = Pair::new(
Default::default(),
ServerConfig {
use_retry: true,
..server_config()
},
);
let mut pair = Pair::default();
pair.server.incoming_connection_behavior = IncomingConnectionBehavior::Validate;
pair.connect();
}

Expand Down Expand Up @@ -459,13 +454,8 @@ fn high_latency_handshake() {
#[test]
fn zero_rtt_happypath() {
let _guard = subscribe();
let mut pair = Pair::new(
Default::default(),
ServerConfig {
use_retry: true,
..server_config()
},
);
let mut pair = Pair::default();
pair.server.incoming_connection_behavior = IncomingConnectionBehavior::Validate;
let config = client_config();

// Establish normal connection
Expand Down Expand Up @@ -1991,7 +1981,7 @@ fn connect_too_low_mtu() {

pair.begin_connect(client_config());
pair.drive();
pair.server.assert_no_accept()
pair.server.assert_no_accept();
}

#[test]
Expand Down Expand Up @@ -2772,3 +2762,23 @@ fn pure_sender_voluntarily_acks() {
let receiver_acks_final = pair.server_conn_mut(server_ch).stats().frame_rx.acks;
assert!(receiver_acks_final > receiver_acks_initial);
}

#[test]
fn reject_manually() {
let _guard = subscribe();
let mut pair = Pair::default();
pair.server.incoming_connection_behavior = IncomingConnectionBehavior::RejectAll;

// The server should now reject incoming connections.
let client_ch = pair.begin_connect(client_config());
pair.drive();
pair.server.assert_no_accept();
let client = pair.client.connections.get_mut(&client_ch).unwrap();
assert!(client.is_closed());
assert!(matches!(
client.poll(),
Some(Event::ConnectionLost {
reason: ConnectionError::ConnectionClosed(close)
}) if close.error_code == TransportErrorCode::CONNECTION_REFUSED
));
}
Loading

0 comments on commit 1ed2fb1

Please sign in to comment.