Skip to content

Commit

Permalink
feat(bin): don't allocate in server UDP recv path (#2202)
Browse files Browse the repository at this point in the history
* feat(bin): don't allocate in server UDP recv path

Previously the `neqo-bin` server would read a set of
datagrams from the socket and allocate them:

``` rust
let dgrams: Vec<Datagram> = dgrams.map(|d| d.to_owned()).collect();
```

This was done out of convenience, as handling `Datagram<&[u8]>`s, each borrowing
from `self.recv_buf`, is hard to get right across multiple `&mut self`
functions, that is here `self.run`, `self.process` and `self.find_socket`.

This commit combines `self.process` and `self.find_socket` and passes a socket
index, instead of the read `Datagram`s from `self.run` to `self.process`, thus
making the Rust borrow checker happy to handle borrowing `Datagram<&[u8]>`s
instead of owning `Datagram`s.

* next().or_else()

* re-introduce find_socket

* Simplify process loops

* Make find_socket and read_and_process associated functions

* Reduce diff
  • Loading branch information
mxinden authored Nov 13, 2024
1 parent 1cae854 commit 978aa4e
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 28 deletions.
2 changes: 1 addition & 1 deletion neqo-bin/src/server/http09.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ impl HttpServer {
}

impl super::HttpServer for HttpServer {
fn process(&mut self, dgram: Option<Datagram>, now: Instant) -> Output {
fn process(&mut self, dgram: Option<Datagram<&[u8]>>, now: Instant) -> Output {
self.server.process(dgram, now)
}

Expand Down
2 changes: 1 addition & 1 deletion neqo-bin/src/server/http3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ impl Display for HttpServer {
}

impl super::HttpServer for HttpServer {
fn process(&mut self, dgram: Option<Datagram>, now: Instant) -> neqo_http3::Output {
fn process(&mut self, dgram: Option<Datagram<&[u8]>>, now: Instant) -> neqo_http3::Output {
self.server.process(dgram, now)
}

Expand Down
84 changes: 58 additions & 26 deletions neqo-bin/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ fn qns_read_response(filename: &str) -> Result<Vec<u8>, io::Error> {

#[allow(clippy::module_name_repetitions)]
pub trait HttpServer: Display {
fn process(&mut self, dgram: Option<Datagram>, now: Instant) -> Output;
fn process(&mut self, dgram: Option<Datagram<&[u8]>>, now: Instant) -> Output;
fn process_events(&mut self, now: Instant);
fn has_events(&self) -> bool;
}
Expand Down Expand Up @@ -222,8 +222,11 @@ impl ServerRunner {
}

/// Tries to find a socket, but then just falls back to sending from the first.
fn find_socket(&mut self, addr: SocketAddr) -> &mut crate::udp::Socket {
let ((_host, first_socket), rest) = self.sockets.split_first_mut().unwrap();
fn find_socket(
sockets: &mut [(SocketAddr, crate::udp::Socket)],
addr: SocketAddr,
) -> &mut crate::udp::Socket {
let ((_host, first_socket), rest) = sockets.split_first_mut().unwrap();
rest.iter_mut()
.map(|(_host, socket)| socket)
.find(|socket| {
Expand All @@ -235,27 +238,67 @@ impl ServerRunner {
.unwrap_or(first_socket)
}

async fn process(&mut self, mut dgram: Option<Datagram>) -> Result<(), io::Error> {
// Free function (i.e. not taking `&mut self: ServerRunner`) to be callable by
// `ServerRunner::read_and_process` while holding a reference to
// `ServerRunner::recv_buf`.
async fn process_inner(
server: &mut Box<dyn HttpServer>,
timeout: &mut Option<Pin<Box<Sleep>>>,
sockets: &mut [(SocketAddr, crate::udp::Socket)],
now: &dyn Fn() -> Instant,
mut input_dgram: Option<Datagram<&[u8]>>,
) -> Result<(), io::Error> {
loop {
match self.server.process(dgram.take(), (self.now)()) {
match server.process(input_dgram.take(), now()) {
Output::Datagram(dgram) => {
let socket = self.find_socket(dgram.source());
let socket = Self::find_socket(sockets, dgram.source());
socket.writable().await?;
socket.send(&dgram)?;
}
Output::Callback(new_timeout) => {
qdebug!("Setting timeout of {:?}", new_timeout);
self.timeout = Some(Box::pin(tokio::time::sleep(new_timeout)));
break;
}
Output::None => {
*timeout = Some(Box::pin(tokio::time::sleep(new_timeout)));
break;
}
Output::None => break,
}
}
Ok(())
}

async fn read_and_process(&mut self, sockets_index: usize) -> Result<(), io::Error> {
loop {
let (host, socket) = self.sockets.get_mut(sockets_index).unwrap();
let Some(input_dgrams) = socket.recv(*host, &mut self.recv_buf)? else {
break;
};

for input_dgram in input_dgrams {
Self::process_inner(
&mut self.server,
&mut self.timeout,
&mut self.sockets,
&self.now,
Some(input_dgram),
)
.await?;
}
}

Ok(())
}

async fn process(&mut self) -> Result<(), io::Error> {
Self::process_inner(
&mut self.server,
&mut self.timeout,
&mut self.sockets,
&self.now,
None,
)
.await
}

// Wait for any of the sockets to be readable or the timeout to fire.
async fn ready(&mut self) -> Result<Ready, io::Error> {
let sockets_ready = select_all(
Expand All @@ -278,30 +321,19 @@ impl ServerRunner {
pub async fn run(mut self) -> Res<()> {
loop {
self.server.process_events((self.now)());

self.process(None).await?;
self.process().await?;

if self.server.has_events() {
continue;
}

match self.ready().await? {
Ready::Socket(inx) => loop {
let (host, socket) = self.sockets.get_mut(inx).unwrap();
let Some(dgrams) = socket.recv(*host, &mut self.recv_buf)? else {
break;
};
if dgrams.len() == 0 {
break;
}
let dgrams: Vec<Datagram> = dgrams.map(|d| d.to_owned()).collect();
for dgram in dgrams {
self.process(Some(dgram)).await?;
}
},
Ready::Socket(sockets_index) => {
self.read_and_process(sockets_index).await?;
}
Ready::Timeout => {
self.timeout = None;
self.process(None).await?;
self.process().await?;
}
}
}
Expand Down

0 comments on commit 978aa4e

Please sign in to comment.