diff --git a/src/pool.rs b/src/pool.rs index ea0c4fbd..df08b07b 100644 --- a/src/pool.rs +++ b/src/pool.rs @@ -101,7 +101,7 @@ impl Connection { } pub fn consume_input(&mut self, amount: usize) { - self.transport.buffers().consume(amount) + self.transport.buffers().input_consume(amount) } pub fn close(self) { diff --git a/src/proxy.rs b/src/proxy.rs index cdcc49d9..2a01ccec 100644 --- a/src/proxy.rs +++ b/src/proxy.rs @@ -231,7 +231,7 @@ impl Connector for ConnectProxyConnector { } continue; }; - buffers.consume(used_input); + buffers.input_consume(used_input); break response; }; diff --git a/src/run.rs b/src/run.rs index bfcce636..a5046f96 100644 --- a/src/run.rs +++ b/src/run.rs @@ -314,7 +314,7 @@ fn send_request( } let buffers = connection.buffers(); - let amount = flow.write(buffers.output_mut())?; + let amount = flow.write(buffers.output())?; let timeout = timings.next_timeout(TimeoutReason::SendRequest); connection.transmit_output(amount, timeout)?; } diff --git a/src/tls/native_tls.rs b/src/tls/native_tls.rs index 48c6222b..689235ff 100644 --- a/src/tls/native_tls.rs +++ b/src/tls/native_tls.rs @@ -195,9 +195,9 @@ impl Transport for NativeTlsTransport { let stream = self.stream.handshaken()?; stream.get_mut().set_timeout(timeout); - let input = self.buffers.input_mut(); + let input = self.buffers.input_append_buf(); let amount = stream.read(input)?; - self.buffers.add_filled(amount); + self.buffers.input_appended(amount); Ok(amount > 0) } diff --git a/src/tls/rustls.rs b/src/tls/rustls.rs index 736bae9e..8e5e4b67 100644 --- a/src/tls/rustls.rs +++ b/src/tls/rustls.rs @@ -185,9 +185,9 @@ impl Transport for RustlsTransport { self.stream.get_mut().set_timeout(timeout); - let input = self.buffers.input_mut(); + let input = self.buffers.input_append_buf(); let amount = self.stream.read(input)?; - self.buffers.add_filled(amount); + self.buffers.input_appended(amount); Ok(amount > 0) } diff --git a/src/transport/buf.rs b/src/transport/buf.rs index 52ba374c..d25035db 100644 --- a/src/transport/buf.rs +++ b/src/transport/buf.rs @@ -4,18 +4,14 @@ use crate::util::ConsumeBuf; /// /// In ureq, the buffers are provided by the [`Transport`](crate::transport::Transport). pub trait Buffers { - /// Read only output buffers. - fn output(&self) -> &[u8]; - /// Mut handle to output buffers to write new data. Data is always - /// written from `0..`, there is no expectation to retain half - /// used output buffers. - fn output_mut(&mut self) -> &mut [u8]; + /// written from `0..`. + fn output(&mut self) -> &mut [u8]; /// Unconsumed bytes in the input buffer as read only. /// - /// The input buffer is written to by using `input_mut` followed by] - /// `add_filled` to indiciate how many additional bytes were added to the + /// The input buffer is written to by using [`Buffers::input_append_buf`] followed by + /// [`Buffers::input_appended`] to indiciate how many additional bytes were added to the /// input. /// /// This buffer should return the total unconsumed bytes. @@ -25,28 +21,23 @@ pub trait Buffers { fn input(&self) -> &[u8]; /// Input buffer to write to. This can be called despite there being unconsumed bytes - /// left. + /// left in the buffer already. /// /// Example: if the internal buffer is `input: Vec`, and we have counters for /// `filled: usize` and `consumed: usize`. This returns `&mut input[filled..]`. - fn input_mut(&mut self) -> &mut [u8]; + fn input_append_buf(&mut self) -> &mut [u8]; - /// Add a number of read bytes into `input_mut()`. + /// Add a number of read bytes into [`Buffers::input_append_buf()`]. /// /// Example: if the internal buffer is `input: Vec`, and we have counters for /// `filled: usize` and `consumed: usize`, this increases `filled`. - fn add_filled(&mut self, amount: usize); + fn input_appended(&mut self, amount: usize); /// Consume a number of bytes from `&input`. /// /// Example: if the internal buffer is `input: Vec`, and we have counters for /// `filled: usize` and `consumed: usize`, this increases `consumed`. - fn consume(&mut self, amount: usize); - - /// Helper to access both input and output buffer at the same time (to work around the - /// borrow checker). This is used to handle incoming data from the remote peer for example - /// when getting chunked data as input and dechunking it into the output. - fn input_and_output(&mut self) -> (&[u8], &mut [u8]); + fn input_consume(&mut self, amount: usize); /// Helper to get a scratch buffer (`tmp`) and the output buffer. This is used when /// sending the request body in which case we use a `Read` trait to read from the @@ -111,11 +102,7 @@ impl LazyBuffers { } impl Buffers for LazyBuffers { - fn output(&self) -> &[u8] { - &self.output - } - - fn output_mut(&mut self) -> &mut [u8] { + fn output(&mut self) -> &mut [u8] { self.ensure_allocation(); &mut self.output } @@ -124,16 +111,11 @@ impl Buffers for LazyBuffers { self.input.unconsumed() } - fn input_mut(&mut self) -> &mut [u8] { + fn input_append_buf(&mut self) -> &mut [u8] { self.ensure_allocation(); self.input.free_mut() } - fn input_and_output(&mut self) -> (&[u8], &mut [u8]) { - self.ensure_allocation(); - (self.input.unconsumed(), &mut self.output) - } - fn tmp_and_output(&mut self) -> (&mut [u8], &mut [u8]) { self.ensure_allocation(); const MIN_TMP_SIZE: usize = 10 * 1024; @@ -153,11 +135,11 @@ impl Buffers for LazyBuffers { (self.input.free_mut(), &mut self.output) } - fn add_filled(&mut self, amount: usize) { + fn input_appended(&mut self, amount: usize) { self.input.add_filled(amount); } - fn consume(&mut self, amount: usize) { + fn input_consume(&mut self, amount: usize) { self.progress = amount > 0; self.input.consume(amount); } diff --git a/src/transport/io.rs b/src/transport/io.rs index 465069a2..6466cc05 100644 --- a/src/transport/io.rs +++ b/src/transport/io.rs @@ -62,7 +62,7 @@ impl io::Read for TransportAdapter { let max = buf.len().min(input.len()); buf[..max].copy_from_slice(&input[..max]); - self.transport.buffers().consume(max); + self.transport.buffers().input_consume(max); Ok(max) } @@ -70,7 +70,7 @@ impl io::Read for TransportAdapter { impl io::Write for TransportAdapter { fn write(&mut self, buf: &[u8]) -> io::Result { - let output = self.transport.buffers().output_mut(); + let output = self.transport.buffers().output(); let max = buf.len().min(output.len()); output[..max].copy_from_slice(&buf[..max]); diff --git a/src/transport/mod.rs b/src/transport/mod.rs index da9c6727..fe98b79e 100644 --- a/src/transport/mod.rs +++ b/src/transport/mod.rs @@ -153,20 +153,22 @@ impl<'a> ConnectionDetails<'a> { /// For sending data, the order of calls are: /// /// 1. [`Transport::buffers()`] to obtain the buffers. -/// 2. [`Buffers::output_mut()`], [`Buffers::input_and_output`] or [`Buffers::tmp_and_output`] -/// depending where in the lifce cycle of the request ureq is. +/// 2. [`Buffers::output()`] or [`Buffers::tmp_and_output`] +/// depending where in the life cycle of the request ureq is. /// 3. [`Transport::transmit_output()`] to ask the transport to send/flush the `amount` of /// buffers used in 2. /// /// For receiving data, the order of calls are: /// /// 1. [`Transport::await_input()`] -/// 2. The transport impl itself uses [`Buffers::input_mut()`] to fill a number -/// of bytes from the underlying transport and use [`Buffers::add_filled`] to +/// 2. The transport impl itself uses [`Buffers::input_append_buf()`] to fill a number +/// of bytes from the underlying transport and use [`Buffers::input_appended()`] to /// tell the buffer how much been filled. /// 3. [`Transport::buffers()`] to obtain the buffers -/// 4. [`Buffers::input()`] followed by [`Buffers::consume()`]. It's important to retain the +/// 4. [`Buffers::input()`] followed by [`Buffers::input_consume()`]. It's important to retain the /// unconsumed bytes for the next call to `await_input()`. This is handled by [`LazyBuffers`]. +/// It's important to call [`Buffers::input_consume()`] also with 0 consumed bytes since that's +/// how we keep track of whether the input is making progress. /// pub trait Transport: Debug + Send + Sync { /// Provide buffers for this transport. @@ -181,7 +183,7 @@ pub trait Transport: Debug + Send + Sync { fn transmit_output(&mut self, amount: usize, timeout: NextTimeout) -> Result<(), Error>; /// Await input from the transport. The transport should internally use - /// [`Buffers::input_mut()`] followed by [`Buffers::add_filled()`] to + /// [`Buffers::input_append_buf()`] followed by [`Buffers::input_appended()`] to /// store the incoming data. fn await_input(&mut self, timeout: NextTimeout) -> Result; diff --git a/src/transport/tcp.rs b/src/transport/tcp.rs index 50d7cc9c..2779f56e 100644 --- a/src/transport/tcp.rs +++ b/src/transport/tcp.rs @@ -164,13 +164,13 @@ impl Transport for TcpTransport { TcpStream::set_read_timeout, )?; - let input = self.buffers.input_mut(); + let input = self.buffers.input_append_buf(); let amount = match self.stream.read(input).normalize_would_block() { Ok(v) => Ok(v), Err(e) if e.kind() == io::ErrorKind::TimedOut => Err(Error::Timeout(timeout.reason)), Err(e) => Err(e.into()), }?; - self.buffers.add_filled(amount); + self.buffers.input_appended(amount); Ok(amount > 0) } diff --git a/src/transport/test.rs b/src/transport/test.rs index b2b7d46c..d6a89b47 100644 --- a/src/transport/test.rs +++ b/src/transport/test.rs @@ -406,7 +406,7 @@ impl Transport for TestTransport { } fn await_input(&mut self, timeout: NextTimeout) -> Result { - let input = self.buffers.input_mut(); + let input = self.buffers.input_append_buf(); let buf = match self.rx.recv_timeout(timeout.after) { Ok(v) => v, Err(RecvTimeoutError::Timeout) => return Err(Error::Timeout(timeout.reason)), @@ -422,7 +422,7 @@ impl Transport for TestTransport { assert!(input.len() >= buf.len()); let max = input.len().min(buf.len()); input[..max].copy_from_slice(&buf[..]); - self.buffers.add_filled(max); + self.buffers.input_appended(max); Ok(max > 0) }