Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify and rename functions in Buffers interface #803

Merged
merged 1 commit into from
Sep 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ impl Connection {
}

pub fn consume_input(&mut self, amount: usize) {
self.transport.buffers().consume(amount)
self.transport.buffers().input_consume(amount)
}

pub fn close(self) {
Expand Down
2 changes: 1 addition & 1 deletion src/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ impl Connector for ConnectProxyConnector {
}
continue;
};
buffers.consume(used_input);
buffers.input_consume(used_input);
break response;
};

Expand Down
2 changes: 1 addition & 1 deletion src/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ fn send_request(
}

let buffers = connection.buffers();
let amount = flow.write(buffers.output_mut())?;
let amount = flow.write(buffers.output())?;
let timeout = timings.next_timeout(TimeoutReason::SendRequest);
connection.transmit_output(amount, timeout)?;
}
Expand Down
4 changes: 2 additions & 2 deletions src/tls/native_tls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,9 +195,9 @@ impl Transport for NativeTlsTransport {
let stream = self.stream.handshaken()?;
stream.get_mut().set_timeout(timeout);

let input = self.buffers.input_mut();
let input = self.buffers.input_append_buf();
let amount = stream.read(input)?;
self.buffers.add_filled(amount);
self.buffers.input_appended(amount);

Ok(amount > 0)
}
Expand Down
4 changes: 2 additions & 2 deletions src/tls/rustls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,9 +185,9 @@ impl Transport for RustlsTransport {

self.stream.get_mut().set_timeout(timeout);

let input = self.buffers.input_mut();
let input = self.buffers.input_append_buf();
let amount = self.stream.read(input)?;
self.buffers.add_filled(amount);
self.buffers.input_appended(amount);

Ok(amount > 0)
}
Expand Down
44 changes: 13 additions & 31 deletions src/transport/buf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,14 @@ use crate::util::ConsumeBuf;
///
/// In ureq, the buffers are provided by the [`Transport`](crate::transport::Transport).
pub trait Buffers {
/// Read only output buffers.
fn output(&self) -> &[u8];

/// Mut handle to output buffers to write new data. Data is always
/// written from `0..`, there is no expectation to retain half
/// used output buffers.
fn output_mut(&mut self) -> &mut [u8];
/// written from `0..`.
fn output(&mut self) -> &mut [u8];

/// Unconsumed bytes in the input buffer as read only.
///
/// The input buffer is written to by using `input_mut` followed by]
/// `add_filled` to indiciate how many additional bytes were added to the
/// The input buffer is written to by using [`Buffers::input_append_buf`] followed by
/// [`Buffers::input_appended`] to indiciate how many additional bytes were added to the
/// input.
///
/// This buffer should return the total unconsumed bytes.
Expand All @@ -25,28 +21,23 @@ pub trait Buffers {
fn input(&self) -> &[u8];

/// Input buffer to write to. This can be called despite there being unconsumed bytes
/// left.
/// left in the buffer already.
///
/// Example: if the internal buffer is `input: Vec<u8>`, and we have counters for
/// `filled: usize` and `consumed: usize`. This returns `&mut input[filled..]`.
fn input_mut(&mut self) -> &mut [u8];
fn input_append_buf(&mut self) -> &mut [u8];

/// Add a number of read bytes into `input_mut()`.
/// Add a number of read bytes into [`Buffers::input_append_buf()`].
///
/// Example: if the internal buffer is `input: Vec<u8>`, and we have counters for
/// `filled: usize` and `consumed: usize`, this increases `filled`.
fn add_filled(&mut self, amount: usize);
fn input_appended(&mut self, amount: usize);

/// Consume a number of bytes from `&input`.
///
/// Example: if the internal buffer is `input: Vec<u8>`, and we have counters for
/// `filled: usize` and `consumed: usize`, this increases `consumed`.
fn consume(&mut self, amount: usize);

/// Helper to access both input and output buffer at the same time (to work around the
/// borrow checker). This is used to handle incoming data from the remote peer for example
/// when getting chunked data as input and dechunking it into the output.
fn input_and_output(&mut self) -> (&[u8], &mut [u8]);
fn input_consume(&mut self, amount: usize);

/// Helper to get a scratch buffer (`tmp`) and the output buffer. This is used when
/// sending the request body in which case we use a `Read` trait to read from the
Expand Down Expand Up @@ -111,11 +102,7 @@ impl LazyBuffers {
}

impl Buffers for LazyBuffers {
fn output(&self) -> &[u8] {
&self.output
}

fn output_mut(&mut self) -> &mut [u8] {
fn output(&mut self) -> &mut [u8] {
self.ensure_allocation();
&mut self.output
}
Expand All @@ -124,16 +111,11 @@ impl Buffers for LazyBuffers {
self.input.unconsumed()
}

fn input_mut(&mut self) -> &mut [u8] {
fn input_append_buf(&mut self) -> &mut [u8] {
self.ensure_allocation();
self.input.free_mut()
}

fn input_and_output(&mut self) -> (&[u8], &mut [u8]) {
self.ensure_allocation();
(self.input.unconsumed(), &mut self.output)
}

fn tmp_and_output(&mut self) -> (&mut [u8], &mut [u8]) {
self.ensure_allocation();
const MIN_TMP_SIZE: usize = 10 * 1024;
Expand All @@ -153,11 +135,11 @@ impl Buffers for LazyBuffers {
(self.input.free_mut(), &mut self.output)
}

fn add_filled(&mut self, amount: usize) {
fn input_appended(&mut self, amount: usize) {
self.input.add_filled(amount);
}

fn consume(&mut self, amount: usize) {
fn input_consume(&mut self, amount: usize) {
self.progress = amount > 0;
self.input.consume(amount);
}
Expand Down
4 changes: 2 additions & 2 deletions src/transport/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,15 +62,15 @@ impl io::Read for TransportAdapter {

let max = buf.len().min(input.len());
buf[..max].copy_from_slice(&input[..max]);
self.transport.buffers().consume(max);
self.transport.buffers().input_consume(max);

Ok(max)
}
}

impl io::Write for TransportAdapter {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
let output = self.transport.buffers().output_mut();
let output = self.transport.buffers().output();

let max = buf.len().min(output.len());
output[..max].copy_from_slice(&buf[..max]);
Expand Down
14 changes: 8 additions & 6 deletions src/transport/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,20 +153,22 @@ impl<'a> ConnectionDetails<'a> {
/// For sending data, the order of calls are:
///
/// 1. [`Transport::buffers()`] to obtain the buffers.
/// 2. [`Buffers::output_mut()`], [`Buffers::input_and_output`] or [`Buffers::tmp_and_output`]
/// depending where in the lifce cycle of the request ureq is.
/// 2. [`Buffers::output()`] or [`Buffers::tmp_and_output`]
/// depending where in the life cycle of the request ureq is.
/// 3. [`Transport::transmit_output()`] to ask the transport to send/flush the `amount` of
/// buffers used in 2.
///
/// For receiving data, the order of calls are:
///
/// 1. [`Transport::await_input()`]
/// 2. The transport impl itself uses [`Buffers::input_mut()`] to fill a number
/// of bytes from the underlying transport and use [`Buffers::add_filled`] to
/// 2. The transport impl itself uses [`Buffers::input_append_buf()`] to fill a number
/// of bytes from the underlying transport and use [`Buffers::input_appended()`] to
/// tell the buffer how much been filled.
/// 3. [`Transport::buffers()`] to obtain the buffers
/// 4. [`Buffers::input()`] followed by [`Buffers::consume()`]. It's important to retain the
/// 4. [`Buffers::input()`] followed by [`Buffers::input_consume()`]. It's important to retain the
/// unconsumed bytes for the next call to `await_input()`. This is handled by [`LazyBuffers`].
/// It's important to call [`Buffers::input_consume()`] also with 0 consumed bytes since that's
/// how we keep track of whether the input is making progress.
///
pub trait Transport: Debug + Send + Sync {
/// Provide buffers for this transport.
Expand All @@ -181,7 +183,7 @@ pub trait Transport: Debug + Send + Sync {
fn transmit_output(&mut self, amount: usize, timeout: NextTimeout) -> Result<(), Error>;

/// Await input from the transport. The transport should internally use
/// [`Buffers::input_mut()`] followed by [`Buffers::add_filled()`] to
/// [`Buffers::input_append_buf()`] followed by [`Buffers::input_appended()`] to
/// store the incoming data.
fn await_input(&mut self, timeout: NextTimeout) -> Result<bool, Error>;

Expand Down
4 changes: 2 additions & 2 deletions src/transport/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,13 +164,13 @@ impl Transport for TcpTransport {
TcpStream::set_read_timeout,
)?;

let input = self.buffers.input_mut();
let input = self.buffers.input_append_buf();
let amount = match self.stream.read(input).normalize_would_block() {
Ok(v) => Ok(v),
Err(e) if e.kind() == io::ErrorKind::TimedOut => Err(Error::Timeout(timeout.reason)),
Err(e) => Err(e.into()),
}?;
self.buffers.add_filled(amount);
self.buffers.input_appended(amount);

Ok(amount > 0)
}
Expand Down
4 changes: 2 additions & 2 deletions src/transport/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,7 @@ impl Transport for TestTransport {
}

fn await_input(&mut self, timeout: NextTimeout) -> Result<bool, Error> {
let input = self.buffers.input_mut();
let input = self.buffers.input_append_buf();
let buf = match self.rx.recv_timeout(timeout.after) {
Ok(v) => v,
Err(RecvTimeoutError::Timeout) => return Err(Error::Timeout(timeout.reason)),
Expand All @@ -422,7 +422,7 @@ impl Transport for TestTransport {
assert!(input.len() >= buf.len());
let max = input.len().min(buf.len());
input[..max].copy_from_slice(&buf[..]);
self.buffers.add_filled(max);
self.buffers.input_appended(max);
Ok(max > 0)
}

Expand Down
Loading