Skip to content

Commit

Permalink
feat(udp): use recvmmsg
Browse files Browse the repository at this point in the history
Read up to `BATCH_SIZE = 32` with single `recvmmsg` syscall.

Previously `neqo_bin::udp::Socket::recv` would use `recvmmsg`, but provide a
single buffer to write into only, effectively using `recvmsg` instead of
`recvmmsg`.

With this commit `Socket::recv` provides `BATCH_SIZE` number of buffers on each
`recvmmsg` syscall, thus reading more than one datagram at a time if available.
  • Loading branch information
mxinden committed Mar 15, 2024
1 parent 177385e commit 08b4f32
Show file tree
Hide file tree
Showing 5 changed files with 85 additions and 137 deletions.
4 changes: 4 additions & 0 deletions neqo-bin/src/bin/client/http09.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,10 @@ impl super::Client for Connection {
self.process(dgram, now)
}

fn process_input(&mut self, dgram: &Datagram, now: Instant) {
self.process_input(dgram, now);
}

fn close<S>(&mut self, now: Instant, app_error: neqo_transport::AppError, msg: S)
where
S: AsRef<str> + std::fmt::Display,
Expand Down
4 changes: 4 additions & 0 deletions neqo-bin/src/bin/client/http3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,10 @@ impl super::Client for Http3Client {
self.process(dgram, now)
}

fn process_input(&mut self, dgram: &Datagram, now: Instant) {
self.process_input(dgram, now);
}

fn close<S>(&mut self, now: Instant, app_error: AppError, msg: S)
where
S: AsRef<str> + Display,
Expand Down
12 changes: 8 additions & 4 deletions neqo-bin/src/bin/client/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,9 @@ trait Handler {

/// Network client, e.g. [`neqo_transport::Connection`] or [`neqo_http3::Http3Client`].
trait Client {
// TODO: datagram option needed?
fn process(&mut self, dgram: Option<&Datagram>, now: Instant) -> Output;
fn process_input(&mut self, dgram: &Datagram, now: Instant);
fn close<S>(&mut self, now: Instant, app_error: AppError, msg: S)
where
S: AsRef<str> + Display;
Expand Down Expand Up @@ -365,11 +367,13 @@ impl<'a, H: Handler> Runner<'a, H> {
match ready(self.socket, self.timeout.as_mut()).await? {
Ready::Socket => loop {
let dgrams = self.socket.recv(&self.local_addr)?;
if dgrams.is_empty() {
break;
let mut is_empty = true;
for dgram in dgrams {
is_empty = false;
self.client.process_input(&dgram, Instant::now());
}
for dgram in &dgrams {
self.process(Some(dgram)).await?;
if is_empty {
break;
}
self.handler.maybe_key_update(&mut self.client)?;
},
Expand Down
4 changes: 3 additions & 1 deletion neqo-bin/src/bin/server/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ fn qns_read_response(filename: &str) -> Option<Vec<u8>> {
}

trait HttpServer: Display {
// TODO: Remove the option?
fn process(&mut self, dgram: Option<&Datagram>, now: Instant) -> Output;
fn process_events(&mut self, args: &Args, now: Instant);
fn set_qlog_dir(&mut self, dir: Option<PathBuf>);
Expand Down Expand Up @@ -543,7 +544,8 @@ impl ServersRunner {
match self.ready().await? {
Ready::Socket(inx) => loop {
let (host, socket) = self.sockets.get_mut(inx).unwrap();
let dgrams = socket.recv(host)?;
// TODO: Remove collect.
let dgrams: Vec<_> = socket.recv(host)?.collect();
if dgrams.is_empty() {
break;
}
Expand Down
198 changes: 66 additions & 132 deletions neqo-bin/src/udp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

use std::{
io::{self, IoSliceMut},
mem::MaybeUninit,
net::{SocketAddr, ToSocketAddrs},
slice,
};
Expand All @@ -17,6 +18,13 @@ use neqo_common::{Datagram, IpTos};
use quinn_udp::{EcnCodepoint, RecvMeta, Transmit, UdpSocketState};
use tokio::io::Interest;

#[cfg(not(any(target_os = "macos", target_os = "ios")))]
// Chosen somewhat arbitrarily; might benefit from additional tuning.
pub(crate) const BATCH_SIZE: usize = 32;

#[cfg(any(target_os = "macos", target_os = "ios"))]
pub(crate) const BATCH_SIZE: usize = 1;

/// Socket receive buffer size.
///
/// Allows reading multiple datagrams in a single [`Socket::recv`] call.
Expand All @@ -25,7 +33,8 @@ const RECV_BUF_SIZE: usize = u16::MAX as usize;
pub struct Socket {
socket: tokio::net::UdpSocket,
state: UdpSocketState,
recv_buf: Vec<u8>,
// TODO: Rename
recv_buf: [Vec<u8>; BATCH_SIZE],
}

impl Socket {
Expand All @@ -36,7 +45,11 @@ impl Socket {
Ok(Self {
state: quinn_udp::UdpSocketState::new((&socket).into())?,
socket: tokio::net::UdpSocket::from_std(socket)?,
recv_buf: vec![0; RECV_BUF_SIZE],
recv_buf: (0..BATCH_SIZE)
.map(|_| vec![0; RECV_BUF_SIZE])
.collect::<Vec<_>>()
.try_into()
.expect("successful array instantiation"),
})
}

Expand Down Expand Up @@ -76,147 +89,68 @@ impl Socket {
}

/// Receive a UDP datagram on the specified socket.
pub fn recv(&mut self, local_address: &SocketAddr) -> Result<Vec<Datagram>, io::Error> {
let mut meta = RecvMeta::default();

match self.socket.try_io(Interest::READABLE, || {
self.state.recv(
(&self.socket).into(),
&mut [IoSliceMut::new(&mut self.recv_buf)],
slice::from_mut(&mut meta),
)
pub fn recv<'a>(
&'a mut self,
local_address: &'a SocketAddr,
) -> Result<impl Iterator<Item = Datagram> + 'a, io::Error> {
let mut metas = [RecvMeta::default(); BATCH_SIZE];

// TODO: Safe?
let mut iovs = MaybeUninit::<[IoSliceMut<'_>; BATCH_SIZE]>::uninit();
for (i, buf) in self.recv_buf.iter_mut().enumerate() {
unsafe {
iovs.as_mut_ptr()
.cast::<IoSliceMut>()
.add(i)
.write(IoSliceMut::new(buf));
};
}
let mut iovs = unsafe { iovs.assume_init() };

let msgs = match self.socket.try_io(Interest::READABLE, || {
self.state
.recv((&self.socket).into(), &mut iovs, &mut metas)
}) {
Ok(n) => {
assert_eq!(n, 1, "only passed one slice");
}
Ok(n) => n,
Err(ref err)
if err.kind() == io::ErrorKind::WouldBlock
|| err.kind() == io::ErrorKind::Interrupted =>
{
return Ok(vec![])
0
}
Err(err) => {
return Err(err);
}
};

if meta.len == 0 {
eprintln!("zero length datagram received?");
return Ok(vec![]);
}
if meta.len == self.recv_buf.len() {
eprintln!(
"Might have received more than {} bytes",
self.recv_buf.len()
);
}

Ok(self.recv_buf[0..meta.len]
.chunks(meta.stride.min(self.recv_buf.len()))
.map(|d| {
Datagram::new(
meta.addr,
*local_address,
meta.ecn.map(|n| IpTos::from(n as u8)).unwrap_or_default(),
None, // TODO: get the real TTL https://github.com/quinn-rs/quinn/issues/1749
d,
)
})
.collect())
}
}

#[cfg(test)]
mod tests {
use neqo_common::{IpTosDscp, IpTosEcn};

use super::*;

#[tokio::test]
async fn datagram_tos() -> Result<(), io::Error> {
let sender = Socket::bind("127.0.0.1:0")?;
let receiver_addr: SocketAddr = "127.0.0.1:0".parse().unwrap();
let mut receiver = Socket::bind(receiver_addr)?;

let datagram = Datagram::new(
sender.local_addr()?,
receiver.local_addr()?,
IpTos::from((IpTosDscp::Le, IpTosEcn::Ect1)),
None,
"Hello, world!".as_bytes().to_vec(),
);

sender.writable().await?;
sender.send(datagram.clone())?;

receiver.readable().await?;
let received_datagram = receiver
.recv(&receiver_addr)
.expect("receive to succeed")
// TODO
// if meta.len == 0 {
// eprintln!("zero length datagram received?");
// return Ok(vec![]);
// }
// if meta.len == self.recv_buf.len() {
// eprintln!(
// "Might have received more than {} bytes",
// self.recv_buf.len()
// );
// }

Ok(metas
.into_iter()
.next()
.expect("receive to yield datagram");

// Assert that the ECN is correct.
assert_eq!(
IpTosEcn::from(datagram.tos()),
IpTosEcn::from(received_datagram.tos())
);

Ok(())
}

/// Expect [`Socket::recv`] to handle multiple [`Datagram`]s on GRO read.
#[tokio::test]
#[cfg_attr(not(any(target_os = "linux", target_os = "windows")), ignore)]
async fn many_datagrams_through_gro() -> Result<(), io::Error> {
const SEGMENT_SIZE: usize = 128;

let sender = Socket::bind("127.0.0.1:0")?;
let receiver_addr: SocketAddr = "127.0.0.1:0".parse().unwrap();
let mut receiver = Socket::bind(receiver_addr)?;

// `neqo_common::udp::Socket::send` does not yet
// (https://github.com/mozilla/neqo/issues/1693) support GSO. Use
// `quinn_udp` directly.
let max_gso_segments = sender.state.max_gso_segments();
let msg = vec![0xAB; SEGMENT_SIZE * max_gso_segments];
let transmit = Transmit {
destination: receiver.local_addr()?,
ecn: EcnCodepoint::from_bits(Into::<u8>::into(IpTos::from((
IpTosDscp::Le,
IpTosEcn::Ect1,
)))),
contents: msg.clone().into(),
segment_size: Some(SEGMENT_SIZE),
src_ip: None,
};
sender.writable().await?;
let n = sender.socket.try_io(Interest::WRITABLE, || {
sender
.state
.send((&sender.socket).into(), slice::from_ref(&transmit))
})?;
assert_eq!(n, 1, "only passed one slice");

// Allow for one GSO sendmmsg to result in multiple GRO recvmmsg.
let mut num_received = 0;
while num_received < max_gso_segments {
receiver.readable().await?;
receiver
.recv(&receiver_addr)
.expect("receive to succeed")
.into_iter()
.for_each(|d| {
assert_eq!(
SEGMENT_SIZE,
d.len(),
"Expect received datagrams to have same length as sent datagrams."
);
num_received += 1;
});
}

Ok(())
.zip(self.recv_buf.iter())
.take(msgs)
.flat_map(move |(meta, buf)| {
buf[0..meta.len]
.chunks(meta.stride.min(buf.len()))
.map(move |d| {
Datagram::new(
meta.addr,
*local_address,
meta.ecn.map(|n| IpTos::from(n as u8)).unwrap_or_default(),
None, // TODO: get the real TTL https://github.com/quinn-rs/quinn/issues/1749
d,
)
})
}))
}
}

0 comments on commit 08b4f32

Please sign in to comment.