Skip to content

Commit

Permalink
Simplify and rename functions in Buffers interface
Browse files Browse the repository at this point in the history
  • Loading branch information
algesten committed Sep 1, 2024
1 parent 3a1b5ec commit d68d793
Show file tree
Hide file tree
Showing 10 changed files with 34 additions and 50 deletions.
2 changes: 1 addition & 1 deletion src/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ impl Connection {
}

pub fn consume_input(&mut self, amount: usize) {
self.transport.buffers().consume(amount)
self.transport.buffers().input_consume(amount)
}

pub fn close(self) {
Expand Down
2 changes: 1 addition & 1 deletion src/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ impl Connector for ConnectProxyConnector {
}
continue;
};
buffers.consume(used_input);
buffers.input_consume(used_input);
break response;
};

Expand Down
2 changes: 1 addition & 1 deletion src/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ fn send_request(
}

let buffers = connection.buffers();
let amount = flow.write(buffers.output_mut())?;
let amount = flow.write(buffers.output())?;
let timeout = timings.next_timeout(TimeoutReason::SendRequest);
connection.transmit_output(amount, timeout)?;
}
Expand Down
4 changes: 2 additions & 2 deletions src/tls/native_tls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,9 +195,9 @@ impl Transport for NativeTlsTransport {
let stream = self.stream.handshaken()?;
stream.get_mut().set_timeout(timeout);

let input = self.buffers.input_mut();
let input = self.buffers.input_append_buf();
let amount = stream.read(input)?;
self.buffers.add_filled(amount);
self.buffers.input_appended(amount);

Ok(amount > 0)
}
Expand Down
4 changes: 2 additions & 2 deletions src/tls/rustls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,9 +185,9 @@ impl Transport for RustlsTransport {

self.stream.get_mut().set_timeout(timeout);

let input = self.buffers.input_mut();
let input = self.buffers.input_append_buf();
let amount = self.stream.read(input)?;
self.buffers.add_filled(amount);
self.buffers.input_appended(amount);

Ok(amount > 0)
}
Expand Down
44 changes: 13 additions & 31 deletions src/transport/buf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,14 @@ use crate::util::ConsumeBuf;
///
/// In ureq, the buffers are provided by the [`Transport`](crate::transport::Transport).
pub trait Buffers {
/// Read only output buffers.
fn output(&self) -> &[u8];

/// Mut handle to output buffers to write new data. Data is always
/// written from `0..`, there is no expectation to retain half
/// used output buffers.
fn output_mut(&mut self) -> &mut [u8];
/// written from `0..`.
fn output(&mut self) -> &mut [u8];

/// Unconsumed bytes in the input buffer as read only.
///
/// The input buffer is written to by using `input_mut` followed by]
/// `add_filled` to indiciate how many additional bytes were added to the
/// The input buffer is written to by using [`Buffers::input_append_buf`] followed by
/// [`Buffers::input_appended`] to indiciate how many additional bytes were added to the
/// input.
///
/// This buffer should return the total unconsumed bytes.
Expand All @@ -25,28 +21,23 @@ pub trait Buffers {
fn input(&self) -> &[u8];

/// Input buffer to write to. This can be called despite there being unconsumed bytes
/// left.
/// left in the buffer already.
///
/// Example: if the internal buffer is `input: Vec<u8>`, and we have counters for
/// `filled: usize` and `consumed: usize`. This returns `&mut input[filled..]`.
fn input_mut(&mut self) -> &mut [u8];
fn input_append_buf(&mut self) -> &mut [u8];

/// Add a number of read bytes into `input_mut()`.
/// Add a number of read bytes into [`Buffers::input_append_buf()`].
///
/// Example: if the internal buffer is `input: Vec<u8>`, and we have counters for
/// `filled: usize` and `consumed: usize`, this increases `filled`.
fn add_filled(&mut self, amount: usize);
fn input_appended(&mut self, amount: usize);

/// Consume a number of bytes from `&input`.
///
/// Example: if the internal buffer is `input: Vec<u8>`, and we have counters for
/// `filled: usize` and `consumed: usize`, this increases `consumed`.
fn consume(&mut self, amount: usize);

/// Helper to access both input and output buffer at the same time (to work around the
/// borrow checker). This is used to handle incoming data from the remote peer for example
/// when getting chunked data as input and dechunking it into the output.
fn input_and_output(&mut self) -> (&[u8], &mut [u8]);
fn input_consume(&mut self, amount: usize);

/// Helper to get a scratch buffer (`tmp`) and the output buffer. This is used when
/// sending the request body in which case we use a `Read` trait to read from the
Expand Down Expand Up @@ -111,11 +102,7 @@ impl LazyBuffers {
}

impl Buffers for LazyBuffers {
fn output(&self) -> &[u8] {
&self.output
}

fn output_mut(&mut self) -> &mut [u8] {
fn output(&mut self) -> &mut [u8] {
self.ensure_allocation();
&mut self.output
}
Expand All @@ -124,16 +111,11 @@ impl Buffers for LazyBuffers {
self.input.unconsumed()
}

fn input_mut(&mut self) -> &mut [u8] {
fn input_append_buf(&mut self) -> &mut [u8] {
self.ensure_allocation();
self.input.free_mut()
}

fn input_and_output(&mut self) -> (&[u8], &mut [u8]) {
self.ensure_allocation();
(self.input.unconsumed(), &mut self.output)
}

fn tmp_and_output(&mut self) -> (&mut [u8], &mut [u8]) {
self.ensure_allocation();
const MIN_TMP_SIZE: usize = 10 * 1024;
Expand All @@ -153,11 +135,11 @@ impl Buffers for LazyBuffers {
(self.input.free_mut(), &mut self.output)
}

fn add_filled(&mut self, amount: usize) {
fn input_appended(&mut self, amount: usize) {
self.input.add_filled(amount);
}

fn consume(&mut self, amount: usize) {
fn input_consume(&mut self, amount: usize) {
self.progress = amount > 0;
self.input.consume(amount);
}
Expand Down
4 changes: 2 additions & 2 deletions src/transport/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,15 +62,15 @@ impl io::Read for TransportAdapter {

let max = buf.len().min(input.len());
buf[..max].copy_from_slice(&input[..max]);
self.transport.buffers().consume(max);
self.transport.buffers().input_consume(max);

Ok(max)
}
}

impl io::Write for TransportAdapter {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
let output = self.transport.buffers().output_mut();
let output = self.transport.buffers().output();

let max = buf.len().min(output.len());
output[..max].copy_from_slice(&buf[..max]);
Expand Down
14 changes: 8 additions & 6 deletions src/transport/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,20 +153,22 @@ impl<'a> ConnectionDetails<'a> {
/// For sending data, the order of calls are:
///
/// 1. [`Transport::buffers()`] to obtain the buffers.
/// 2. [`Buffers::output_mut()`], [`Buffers::input_and_output`] or [`Buffers::tmp_and_output`]
/// depending where in the lifce cycle of the request ureq is.
/// 2. [`Buffers::output()`] or [`Buffers::tmp_and_output`]
/// depending where in the life cycle of the request ureq is.
/// 3. [`Transport::transmit_output()`] to ask the transport to send/flush the `amount` of
/// buffers used in 2.
///
/// For receiving data, the order of calls are:
///
/// 1. [`Transport::await_input()`]
/// 2. The transport impl itself uses [`Buffers::input_mut()`] to fill a number
/// of bytes from the underlying transport and use [`Buffers::add_filled`] to
/// 2. The transport impl itself uses [`Buffers::input_append_buf()`] to fill a number
/// of bytes from the underlying transport and use [`Buffers::input_appended()`] to
/// tell the buffer how much been filled.
/// 3. [`Transport::buffers()`] to obtain the buffers
/// 4. [`Buffers::input()`] followed by [`Buffers::consume()`]. It's important to retain the
/// 4. [`Buffers::input()`] followed by [`Buffers::input_consume()`]. It's important to retain the
/// unconsumed bytes for the next call to `await_input()`. This is handled by [`LazyBuffers`].
/// It's important to call [`Buffers::input_consume()`] also with 0 consumed bytes since that's
/// how we keep track of whether the input is making progress.
///
pub trait Transport: Debug + Send + Sync {
/// Provide buffers for this transport.
Expand All @@ -181,7 +183,7 @@ pub trait Transport: Debug + Send + Sync {
fn transmit_output(&mut self, amount: usize, timeout: NextTimeout) -> Result<(), Error>;

/// Await input from the transport. The transport should internally use
/// [`Buffers::input_mut()`] followed by [`Buffers::add_filled()`] to
/// [`Buffers::input_append_buf()`] followed by [`Buffers::input_appended()`] to
/// store the incoming data.
fn await_input(&mut self, timeout: NextTimeout) -> Result<bool, Error>;

Expand Down
4 changes: 2 additions & 2 deletions src/transport/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,13 +164,13 @@ impl Transport for TcpTransport {
TcpStream::set_read_timeout,
)?;

let input = self.buffers.input_mut();
let input = self.buffers.input_append_buf();
let amount = match self.stream.read(input).normalize_would_block() {
Ok(v) => Ok(v),
Err(e) if e.kind() == io::ErrorKind::TimedOut => Err(Error::Timeout(timeout.reason)),
Err(e) => Err(e.into()),
}?;
self.buffers.add_filled(amount);
self.buffers.input_appended(amount);

Ok(amount > 0)
}
Expand Down
4 changes: 2 additions & 2 deletions src/transport/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,7 @@ impl Transport for TestTransport {
}

fn await_input(&mut self, timeout: NextTimeout) -> Result<bool, Error> {
let input = self.buffers.input_mut();
let input = self.buffers.input_append_buf();
let buf = match self.rx.recv_timeout(timeout.after) {
Ok(v) => v,
Err(RecvTimeoutError::Timeout) => return Err(Error::Timeout(timeout.reason)),
Expand All @@ -422,7 +422,7 @@ impl Transport for TestTransport {
assert!(input.len() >= buf.len());
let max = input.len().min(buf.len());
input[..max].copy_from_slice(&buf[..]);
self.buffers.add_filled(max);
self.buffers.input_appended(max);
Ok(max > 0)
}

Expand Down

0 comments on commit d68d793

Please sign in to comment.