From fbcb2786398c4be467cf8deed6a319040124fb27 Mon Sep 17 00:00:00 2001 From: Martin Algesten Date: Sun, 1 Sep 2024 13:14:08 +0200 Subject: [PATCH 1/2] Move CallTimings around --- src/config.rs | 160 +--------------------------- src/error.rs | 53 +--------- src/lib.rs | 4 +- src/pool.rs | 4 +- src/resolver.rs | 4 +- src/run.rs | 40 +++---- src/timings.rs | 230 +++++++++++++++++++++++++++++++++++++++++ src/tls/native_tls.rs | 1 - src/tls/rustls.rs | 3 +- src/transport/io.rs | 8 +- src/transport/mod.rs | 4 +- src/transport/socks.rs | 3 +- src/transport/tcp.rs | 4 +- src/transport/test.rs | 4 +- src/transport/time.rs | 23 ----- 15 files changed, 275 insertions(+), 270 deletions(-) create mode 100644 src/timings.rs diff --git a/src/config.rs b/src/config.rs index 255103ca..9f9a63e4 100644 --- a/src/config.rs +++ b/src/config.rs @@ -1,5 +1,4 @@ use std::fmt; -use std::sync::Arc; use std::time::Duration; use hoot::client::flow::RedirectAuthHeaders; @@ -7,8 +6,7 @@ use http::Uri; use crate::middleware::MiddlewareChain; use crate::resolver::IpFamily; -use crate::transport::time::{Instant, NextTimeout}; -use crate::{Proxy, TimeoutReason}; +use crate::Proxy; #[cfg(feature = "_tls")] use crate::tls::TlsConfig; @@ -325,159 +323,3 @@ impl fmt::Debug for Timeouts { .finish() } } - -#[derive(Debug, Default)] -pub(crate) struct CallTimings { - pub timeouts: Timeouts, - pub current_time: CurrentTime, - pub time_global_start: Option, - pub time_call_start: Option, - pub time_resolve: Option, - pub time_connect: Option, - pub time_send_request: Option, - pub time_send_body: Option, - pub time_await_100: Option, - pub time_recv_response: Option, - pub time_recv_body: Option, -} - -#[derive(Clone)] -pub(crate) struct CurrentTime(Arc Instant + Send + Sync + 'static>); - -impl CurrentTime { - pub(crate) fn now(&self) -> Instant { - self.0() - } -} - -impl CallTimings { - pub(crate) fn now(&self) -> Instant { - self.current_time.now() - } - - pub(crate) fn record_timeout(&mut self, reason: TimeoutReason) { - match reason { - TimeoutReason::Global => { - let now = self.now(); - if self.time_global_start.is_none() { - self.time_global_start = Some(now); - } - self.time_call_start = Some(now); - } - TimeoutReason::Resolver => { - self.time_resolve = Some(self.now()); - } - TimeoutReason::OpenConnection => { - self.time_connect = Some(self.now()); - } - TimeoutReason::SendRequest => { - self.time_send_request = Some(self.now()); - } - TimeoutReason::SendBody => { - self.time_send_body = Some(self.now()); - } - TimeoutReason::Await100 => { - self.time_await_100 = Some(self.now()); - } - TimeoutReason::RecvResponse => { - self.time_recv_response = Some(self.now()); - } - TimeoutReason::RecvBody => { - self.time_recv_body = Some(self.now()); - } - } - } - - pub(crate) fn next_timeout(&self, reason: TimeoutReason) -> NextTimeout { - // self.time_xxx unwraps() below are OK. If the unwrap fails, we have a state - // bug where we progressed to a certain state without setting the corresponding time. - let timeouts = &self.timeouts; - - let expire_at = match reason { - TimeoutReason::Global => timeouts - .global - .map(|t| self.time_global_start.unwrap() + t.into()), - TimeoutReason::Resolver => timeouts - .resolve - .map(|t| self.time_call_start.unwrap() + t.into()), - TimeoutReason::OpenConnection => timeouts - .connect - .map(|t| self.time_resolve.unwrap() + t.into()), - TimeoutReason::SendRequest => timeouts - .send_request - .map(|t| self.time_connect.unwrap() + t.into()), - TimeoutReason::SendBody => timeouts - .send_body - .map(|t| self.time_send_request.unwrap() + t.into()), - TimeoutReason::Await100 => timeouts - .await_100 - .map(|t| self.time_send_request.unwrap() + t.into()), - TimeoutReason::RecvResponse => timeouts.recv_response.map(|t| { - // The fallback order is important. See state diagram in hoot. - self.time_send_body - .or(self.time_await_100) - .or(self.time_send_request) - .unwrap() - + t.into() - }), - TimeoutReason::RecvBody => timeouts - .recv_body - .map(|t| self.time_recv_response.unwrap() + t.into()), - } - .unwrap_or(Instant::NotHappening); - - let global_at = self.global_timeout(); - - let (at, reason) = if global_at < expire_at { - (global_at, TimeoutReason::Global) - } else { - (expire_at, reason) - }; - - let after = at.duration_since(self.now()); - - NextTimeout { after, reason } - } - - fn global_timeout(&self) -> Instant { - let global_start = self.time_global_start.unwrap(); - let call_start = self.time_call_start.unwrap(); - - let global_at = global_start - + self - .timeouts - .global - .map(|t| t.into()) - .unwrap_or(crate::transport::time::Duration::NotHappening); - - let call_at = call_start - + self - .timeouts - .per_call - .map(|t| t.into()) - .unwrap_or(crate::transport::time::Duration::NotHappening); - - global_at.min(call_at) - } - - pub(crate) fn new_call(self) -> CallTimings { - CallTimings { - timeouts: self.timeouts, - time_global_start: self.time_global_start, - current_time: self.current_time, - ..Default::default() - } - } -} - -impl fmt::Debug for CurrentTime { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_tuple("CurrentTime").finish() - } -} - -impl Default for CurrentTime { - fn default() -> Self { - Self(Arc::new(Instant::now)) - } -} diff --git a/src/error.rs b/src/error.rs index 94ce5ce6..41f1aefc 100644 --- a/src/error.rs +++ b/src/error.rs @@ -1,7 +1,9 @@ -use std::{fmt, io}; +use std::io; use thiserror::Error; +use crate::Timeout; + /// Errors from ureq. #[derive(Debug, Error)] #[non_exhaustive] @@ -38,7 +40,7 @@ pub enum Error { /// /// By default no timeouts are set, which means this error can't happen. #[error("timeout: {0}")] - Timeout(TimeoutReason), + Timeout(Timeout), /// Error when resolving a hostname fails. #[error("host not found")] @@ -183,53 +185,6 @@ impl Error { } } -/// Motivation for an [`Error::Timeout`]. -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -#[non_exhaustive] -pub enum TimeoutReason { - /// Timeout for entire call. - Global, - - /// Timeout in the resolver. - Resolver, - - /// Timeout while opening the connection. - OpenConnection, - - /// Timeout while sending the request headers. - SendRequest, - - /// Timeout when sending then request body. - SendBody, - - /// Internal value never seen outside ureq (since awaiting 100 is expected - /// to timeout). - #[doc(hidden)] - Await100, - - /// Timeout while receiving the response headers. - RecvResponse, - - /// Timeout while receiving the response body. - RecvBody, -} - -impl fmt::Display for TimeoutReason { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - let r = match self { - TimeoutReason::Global => "global", - TimeoutReason::Resolver => "resolver", - TimeoutReason::OpenConnection => "open connection", - TimeoutReason::SendRequest => "send request", - TimeoutReason::SendBody => "send body", - TimeoutReason::Await100 => "await 100", - TimeoutReason::RecvResponse => "receive response", - TimeoutReason::RecvBody => "receive body", - }; - write!(f, "{}", r) - } -} - impl From for Error { fn from(e: io::Error) -> Self { let is_wrapped_ureq_error = e.get_ref().map(|x| x.is::()).unwrap_or(false); diff --git a/src/lib.rs b/src/lib.rs index 8d50537c..99f8525f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -340,6 +340,7 @@ mod proxy; mod request; mod run; mod send_body; +mod timings; mod util; pub mod middleware; @@ -355,8 +356,9 @@ mod cookies; pub use cookies::{Cookie, CookieJar}; pub use agent::Agent; -pub use error::{Error, TimeoutReason}; +pub use error::Error; pub use send_body::SendBody; +pub use timings::Timeout; /// Run a [`http::Request`]. pub fn run(request: Request) -> Result, Error> { diff --git a/src/pool.rs b/src/pool.rs index df08b07b..69f4eb27 100644 --- a/src/pool.rs +++ b/src/pool.rs @@ -6,8 +6,8 @@ use http::uri::{Authority, Scheme}; use http::Uri; use crate::proxy::Proxy; -use crate::transport::time::{Duration, Instant, NextTimeout}; -use crate::transport::{Buffers, ConnectionDetails, Connector, Transport}; +use crate::transport::time::{Duration, Instant}; +use crate::transport::{Buffers, ConnectionDetails, Connector, NextTimeout, Transport}; use crate::util::DebugAuthority; use crate::{AgentConfig, Error}; diff --git a/src/resolver.rs b/src/resolver.rs index e76f7bed..dd18b319 100644 --- a/src/resolver.rs +++ b/src/resolver.rs @@ -18,7 +18,7 @@ use http::uri::{Authority, Scheme}; use http::Uri; use smallvec::{smallvec, SmallVec}; -use crate::transport::time::NextTimeout; +use crate::transport::NextTimeout; use crate::util::{SchemeExt, UriExt}; use crate::{AgentConfig, Error}; @@ -187,7 +187,7 @@ mod test { &config, NextTimeout { after: Duration::NotHappening, - reason: crate::TimeoutReason::Global, + reason: crate::Timeout::Global, }, ) .unwrap_err(); diff --git a/src/run.rs b/src/run.rs index a5046f96..74235676 100644 --- a/src/run.rs +++ b/src/run.rs @@ -9,12 +9,12 @@ use http::uri::Scheme; use http::{HeaderValue, Request, Response, Uri}; use crate::body::ResponseInfo; -use crate::config::CallTimings; use crate::pool::Connection; +use crate::timings::CallTimings; use crate::transport::time::{Duration, Instant}; use crate::transport::ConnectionDetails; use crate::util::{DebugRequest, DebugResponse, DebugUri, HeaderMapExt, UriExt}; -use crate::{Agent, AgentConfig, Body, Error, SendBody, TimeoutReason, Timeouts}; +use crate::{Agent, AgentConfig, Body, Error, SendBody, Timeout, Timeouts}; type Flow = hoot::client::flow::Flow<(), T>; @@ -39,15 +39,15 @@ pub(crate) fn run( let mut flow = Flow::new(request)?; let (response, handler) = loop { - timings.record_timeout(TimeoutReason::Global); + timings.record_timeout(Timeout::Global); - let timeout = timings.next_timeout(TimeoutReason::Global); + let timeout = timings.next_timeout(Timeout::Global); let timed_out = match timeout.after { Duration::Exact(v) => v.is_zero(), Duration::NotHappening => false, }; if timed_out { - return Err(Error::Timeout(TimeoutReason::Global)); + return Err(Error::Timeout(Timeout::Global)); } match flow_run(agent, flow, &mut body, redirect_count, &mut timings)? { @@ -114,7 +114,7 @@ fn flow_run( redirect_count: u32, timings: &mut CallTimings, ) -> Result { - timings.record_timeout(crate::TimeoutReason::Global); + timings.record_timeout(crate::Timeout::Global); let uri = flow.uri().clone(); info!("{} {:?}", flow.method(), &DebugUri(flow.uri())); @@ -282,10 +282,10 @@ fn connect(agent: &Agent, uri: &Uri, timings: &mut CallTimings) -> Result Result Result, Error> { while flow.can_keep_await_100() { - let timeout = timings.next_timeout(TimeoutReason::Await100); + let timeout = timings.next_timeout(Timeout::Await100); if timeout.after.is_zero() { // Stop waiting for 100-continue.a @@ -355,7 +355,7 @@ fn await_100( } } - timings.record_timeout(TimeoutReason::Await100); + timings.record_timeout(Timeout::Await100); Ok(flow.proceed()) } @@ -403,11 +403,11 @@ fn send_body( output_used }; - let timeout = timings.next_timeout(TimeoutReason::SendBody); + let timeout = timings.next_timeout(Timeout::SendBody); connection.transmit_output(output_used, timeout)?; } - timings.record_timeout(TimeoutReason::SendBody); + timings.record_timeout(Timeout::SendBody); Ok(flow.proceed().unwrap()) } @@ -418,7 +418,7 @@ fn recv_response( timings: &mut CallTimings, ) -> Result<(Response<()>, RecvResponseResult<()>), Error> { let response = loop { - let timeout = timings.next_timeout(TimeoutReason::RecvResponse); + let timeout = timings.next_timeout(Timeout::RecvResponse); let made_progress = connection.await_input(timeout)?; let input = connection.buffers().input(); @@ -442,7 +442,7 @@ fn recv_response( } }; - timings.record_timeout(TimeoutReason::RecvResponse); + timings.record_timeout(Timeout::RecvResponse); Ok((response, flow.proceed().unwrap())) } @@ -528,7 +528,7 @@ impl BodyHandler { return Ok(0); } - let timeout = timings.next_timeout(TimeoutReason::RecvBody); + let timeout = timings.next_timeout(Timeout::RecvBody); let made_progress = match connection.await_input(timeout) { Ok(v) => v, @@ -568,7 +568,7 @@ impl BodyHandler { } fn ended(&mut self) -> Result<(), Error> { - self.timings.record_timeout(TimeoutReason::RecvBody); + self.timings.record_timeout(Timeout::RecvBody); let flow = self.flow.take().expect("ended() called with body"); diff --git a/src/timings.rs b/src/timings.rs new file mode 100644 index 00000000..0395a2f7 --- /dev/null +++ b/src/timings.rs @@ -0,0 +1,230 @@ +use std::fmt; +use std::sync::Arc; + +use crate::transport::time::{Duration, Instant}; +use crate::Timeouts; + +/// The various timeouts. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +#[non_exhaustive] +pub enum Timeout { + /// Timeout for entire call. + Global, + + /// Timeout in the resolver. + Resolve, + + /// Timeout while opening the connection. + Connect, + + /// Timeout while sending the request headers. + SendRequest, + + /// Timeout when sending then request body. + SendBody, + + /// Internal value never seen outside ureq (since awaiting 100 is expected + /// to timeout). + #[doc(hidden)] + Await100, + + /// Timeout while receiving the response headers. + RecvResponse, + + /// Timeout while receiving the response body. + RecvBody, +} + +#[derive(Debug, Default)] +pub(crate) struct CallTimings { + pub timeouts: Timeouts, + pub current_time: CurrentTime, + + pub time_global_start: Option, + pub time_call_start: Option, + pub time_resolve: Option, + pub time_connect: Option, + pub time_send_request: Option, + pub time_send_body: Option, + pub time_await_100: Option, + pub time_recv_response: Option, + pub time_recv_body: Option, +} + +#[derive(Clone)] +pub(crate) struct CurrentTime(Arc Instant + Send + Sync + 'static>); + +impl CurrentTime { + pub(crate) fn now(&self) -> Instant { + self.0() + } +} + +/// A pair of [`Duration`] and [`TimeoutReason`]. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct NextTimeout { + /// Duration until next timeout. + pub after: Duration, + /// The name of the next timeout.s + pub reason: Timeout, +} + +impl NextTimeout { + pub(crate) fn not_zero(&self) -> Option { + if self.after.is_not_happening() { + None + } else if self.after.is_zero() { + Some(Duration::from_secs(1)) + } else { + Some(self.after) + } + } +} + +impl CallTimings { + pub(crate) fn now(&self) -> Instant { + self.current_time.now() + } + + pub(crate) fn record_timeout(&mut self, reason: Timeout) { + match reason { + Timeout::Global => { + let now = self.now(); + if self.time_global_start.is_none() { + self.time_global_start = Some(now); + } + self.time_call_start = Some(now); + } + Timeout::Resolve => { + self.time_resolve = Some(self.now()); + } + Timeout::Connect => { + self.time_connect = Some(self.now()); + } + Timeout::SendRequest => { + self.time_send_request = Some(self.now()); + } + Timeout::SendBody => { + self.time_send_body = Some(self.now()); + } + Timeout::Await100 => { + self.time_await_100 = Some(self.now()); + } + Timeout::RecvResponse => { + self.time_recv_response = Some(self.now()); + } + Timeout::RecvBody => { + self.time_recv_body = Some(self.now()); + } + } + } + + pub(crate) fn next_timeout(&self, reason: Timeout) -> NextTimeout { + // self.time_xxx unwraps() below are OK. If the unwrap fails, we have a state + // bug where we progressed to a certain state without setting the corresponding time. + let timeouts = &self.timeouts; + + let expire_at = match reason { + Timeout::Global => timeouts + .global + .map(|t| self.time_global_start.unwrap() + t.into()), + Timeout::Resolve => timeouts + .resolve + .map(|t| self.time_call_start.unwrap() + t.into()), + Timeout::Connect => timeouts + .connect + .map(|t| self.time_resolve.unwrap() + t.into()), + Timeout::SendRequest => timeouts + .send_request + .map(|t| self.time_connect.unwrap() + t.into()), + Timeout::SendBody => timeouts + .send_body + .map(|t| self.time_send_request.unwrap() + t.into()), + Timeout::Await100 => timeouts + .await_100 + .map(|t| self.time_send_request.unwrap() + t.into()), + Timeout::RecvResponse => timeouts.recv_response.map(|t| { + // The fallback order is important. See state diagram in hoot. + self.time_send_body + .or(self.time_await_100) + .or(self.time_send_request) + .unwrap() + + t.into() + }), + Timeout::RecvBody => timeouts + .recv_body + .map(|t| self.time_recv_response.unwrap() + t.into()), + } + .unwrap_or(Instant::NotHappening); + + let global_at = self.global_timeout(); + + let (at, reason) = if global_at < expire_at { + (global_at, Timeout::Global) + } else { + (expire_at, reason) + }; + + let after = at.duration_since(self.now()); + + NextTimeout { after, reason } + } + + fn global_timeout(&self) -> Instant { + let global_start = self.time_global_start.unwrap(); + let call_start = self.time_call_start.unwrap(); + + let global_at = global_start + + self + .timeouts + .global + .map(|t| t.into()) + .unwrap_or(crate::transport::time::Duration::NotHappening); + + let call_at = call_start + + self + .timeouts + .per_call + .map(|t| t.into()) + .unwrap_or(crate::transport::time::Duration::NotHappening); + + global_at.min(call_at) + } + + pub(crate) fn new_call(self) -> CallTimings { + CallTimings { + timeouts: self.timeouts, + time_global_start: self.time_global_start, + current_time: self.current_time, + ..Default::default() + } + } +} + +impl fmt::Debug for CurrentTime { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_tuple("CurrentTime").finish() + } +} + +impl Default for CurrentTime { + fn default() -> Self { + Self(Arc::new(Instant::now)) + } +} + +impl fmt::Display for Timeout { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let r = match self { + Timeout::Global => "global", + Timeout::Resolve => "resolver", + Timeout::Connect => "open connection", + Timeout::SendRequest => "send request", + Timeout::SendBody => "send body", + Timeout::Await100 => "await 100", + Timeout::RecvResponse => "receive response", + Timeout::RecvBody => "receive body", + }; + write!(f, "{}", r) + } +} diff --git a/src/tls/native_tls.rs b/src/tls/native_tls.rs index 689235ff..62cf7076 100644 --- a/src/tls/native_tls.rs +++ b/src/tls/native_tls.rs @@ -4,7 +4,6 @@ use std::io::{Read, Write}; use std::sync::Arc; use crate::tls::{RootCerts, TlsProvider}; -use crate::transport::time::NextTimeout; use crate::{transport::*, Error}; use der::pem::LineEnding; use der::Document; diff --git a/src/tls/rustls.rs b/src/tls/rustls.rs index 8e5e4b67..7af9db99 100644 --- a/src/tls/rustls.rs +++ b/src/tls/rustls.rs @@ -11,9 +11,8 @@ use rustls_pki_types::{PrivateSec1KeyDer, ServerName}; use crate::tls::cert::KeyKind; use crate::tls::{RootCerts, TlsProvider}; -use crate::transport::time::NextTimeout; use crate::transport::{Buffers, ConnectionDetails, Connector, LazyBuffers}; -use crate::transport::{Transport, TransportAdapter}; +use crate::transport::{NextTimeout, Transport, TransportAdapter}; use crate::Error; use super::TlsConfig; diff --git a/src/transport/io.rs b/src/transport/io.rs index 6466cc05..ba516dde 100644 --- a/src/transport/io.rs +++ b/src/transport/io.rs @@ -1,9 +1,9 @@ use std::io; -use crate::transport::time::{Duration, NextTimeout}; -use crate::TimeoutReason; +use crate::transport::time::Duration; +use crate::Timeout; -use super::Transport; +use super::{NextTimeout, Transport}; /// Helper to turn a [`Transport`] into a std::io [`Read`](io::Read) and [`Write`](io::Write). /// @@ -21,7 +21,7 @@ impl TransportAdapter { Self { timeout: NextTimeout { after: Duration::NotHappening, - reason: TimeoutReason::Global, + reason: Timeout::Global, }, transport, } diff --git a/src/transport/mod.rs b/src/transport/mod.rs index fe98b79e..2d4c581e 100644 --- a/src/transport/mod.rs +++ b/src/transport/mod.rs @@ -31,7 +31,7 @@ use crate::resolver::{ResolvedSocketAddrs, Resolver}; use crate::{AgentConfig, Error}; pub use self::tcp::TcpConnector; -use self::time::{Instant, NextTimeout}; +use self::time::Instant; mod buf; pub use buf::{Buffers, LazyBuffers}; @@ -58,6 +58,8 @@ pub use crate::proxy::ConnectProxyConnector; pub mod time; +pub use crate::timings::NextTimeout; + /// Trait for components providing some aspect of connecting. /// /// A connector instance is reused to produce multiple [`Transport`] instances (where `Transport` diff --git a/src/transport/socks.rs b/src/transport/socks.rs index 0dc07ad5..e8168e57 100644 --- a/src/transport/socks.rs +++ b/src/transport/socks.rs @@ -11,8 +11,7 @@ use crate::transport::tcp::TcpTransport; use crate::transport::LazyBuffers; use crate::Error; -use super::time::NextTimeout; -use super::{ConnectionDetails, Connector, Transport}; +use super::{ConnectionDetails, Connector, NextTimeout, Transport}; /// Connector for SOCKS proxies. /// diff --git a/src/transport/tcp.rs b/src/transport/tcp.rs index 2779f56e..5fa3e002 100644 --- a/src/transport/tcp.rs +++ b/src/transport/tcp.rs @@ -3,11 +3,11 @@ use std::net::{SocketAddr, TcpStream}; use std::{fmt, io, time}; use crate::resolver::ResolvedSocketAddrs; -use crate::transport::time::{Duration, NextTimeout}; +use crate::transport::time::Duration; use crate::util::IoResultExt; use crate::{AgentConfig, Error}; -use super::{Buffers, ConnectionDetails, Connector, LazyBuffers, Transport}; +use super::{Buffers, ConnectionDetails, Connector, LazyBuffers, NextTimeout, Transport}; #[derive(Default)] /// Connector for regular TCP sockets. diff --git a/src/transport/test.rs b/src/transport/test.rs index d6a89b47..b20ec17e 100644 --- a/src/transport/test.rs +++ b/src/transport/test.rs @@ -9,10 +9,10 @@ use std::{fmt, io, thread}; use http::{Method, Request, Uri}; -use crate::transport::time::{Duration, NextTimeout}; +use crate::transport::time::Duration; use crate::Error; -use super::{Buffers, ConnectionDetails, Connector, LazyBuffers, Transport}; +use super::{Buffers, ConnectionDetails, Connector, LazyBuffers, NextTimeout, Transport}; #[derive(Default)] pub(crate) struct TestConnector; diff --git a/src/transport/time.rs b/src/transport/time.rs index dd5f8066..be8ab114 100644 --- a/src/transport/time.rs +++ b/src/transport/time.rs @@ -4,8 +4,6 @@ use std::cmp::Ordering; use std::ops::{Add, AddAssign, Deref}; use std::time; -use crate::TimeoutReason; - /// Wrapper for [`std::time::Instant`] that provides additional time points in the past or future #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum Instant { @@ -142,27 +140,6 @@ impl From for Duration { } } -/// A pair of [`Duration`] and [`TimeoutReason`]. -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub struct NextTimeout { - /// Duration until next timeout. - pub after: Duration, - /// The name of the next timeout.s - pub reason: TimeoutReason, -} - -impl NextTimeout { - pub(crate) fn not_zero(&self) -> Option { - if self.after.is_not_happening() { - None - } else if self.after.is_zero() { - Some(Duration::from_secs(1)) - } else { - Some(self.after) - } - } -} - #[cfg(test)] mod test { use super::*; From b1e3d7aadeb272e74bfc68457262499080e965bd Mon Sep 17 00:00:00 2001 From: Martin Algesten Date: Sun, 1 Sep 2024 14:11:07 +0200 Subject: [PATCH 2/2] Rewrite CurrentTimings logic --- src/run.rs | 25 ++-- src/timings.rs | 270 ++++++++++++++++++++---------------------- src/transport/time.rs | 8 +- 3 files changed, 141 insertions(+), 162 deletions(-) diff --git a/src/run.rs b/src/run.rs index 74235676..eafc25bf 100644 --- a/src/run.rs +++ b/src/run.rs @@ -10,7 +10,7 @@ use http::{HeaderValue, Request, Response, Uri}; use crate::body::ResponseInfo; use crate::pool::Connection; -use crate::timings::CallTimings; +use crate::timings::{CallTimings, CurrentTime}; use crate::transport::time::{Duration, Instant}; use crate::transport::ConnectionDetails; use crate::util::{DebugRequest, DebugResponse, DebugUri, HeaderMapExt, UriExt}; @@ -31,16 +31,11 @@ pub(crate) fn run( .get::() .unwrap_or(&agent.config.timeouts); - let mut timings = CallTimings { - timeouts, - ..Default::default() - }; + let mut timings = CallTimings::new(timeouts, CurrentTime::default()); let mut flow = Flow::new(request)?; let (response, handler) = loop { - timings.record_timeout(Timeout::Global); - let timeout = timings.next_timeout(Timeout::Global); let timed_out = match timeout.after { Duration::Exact(v) => v.is_zero(), @@ -114,8 +109,6 @@ fn flow_run( redirect_count: u32, timings: &mut CallTimings, ) -> Result { - timings.record_timeout(crate::Timeout::Global); - let uri = flow.uri().clone(); info!("{} {:?}", flow.method(), &DebugUri(flow.uri())); @@ -285,7 +278,7 @@ fn connect(agent: &Agent, uri: &Uri, timings: &mut CallTimings) -> Result Result Result<(), Error> { - self.timings.record_timeout(Timeout::RecvBody); + self.timings.record_time(Timeout::RecvBody); let flow = self.flow.take().expect("ended() called with body"); diff --git a/src/timings.rs b/src/timings.rs index 0395a2f7..2f4179cd 100644 --- a/src/timings.rs +++ b/src/timings.rs @@ -1,16 +1,23 @@ use std::fmt; use std::sync::Arc; +use smallvec::SmallVec; + use crate::transport::time::{Duration, Instant}; use crate::Timeouts; /// The various timeouts. +/// +/// Each enum corresponds to a value in [`AgentConfig::timeouts`][crate::AgentConfig::timeouts]. #[derive(Debug, Clone, Copy, PartialEq, Eq)] #[non_exhaustive] pub enum Timeout { - /// Timeout for entire call. + /// Timeout for entire operation. Global, + /// Timeout for the current call (when redirected). + PerCall, + /// Timeout in the resolver. Resolve, @@ -20,14 +27,14 @@ pub enum Timeout { /// Timeout while sending the request headers. SendRequest, - /// Timeout when sending then request body. - SendBody, - /// Internal value never seen outside ureq (since awaiting 100 is expected /// to timeout). #[doc(hidden)] Await100, + /// Timeout when sending then request body. + SendBody, + /// Timeout while receiving the response headers. RecvResponse, @@ -35,20 +42,124 @@ pub enum Timeout { RecvBody, } +impl Timeout { + /// Give the immediate preceeding Timeout + fn preceeding(&self) -> impl Iterator { + let prev: &[Timeout] = match self { + Timeout::Resolve => &[Timeout::PerCall], + Timeout::Connect => &[Timeout::Resolve], + Timeout::SendRequest => &[Timeout::Connect], + Timeout::Await100 => &[Timeout::SendRequest], + Timeout::SendBody => &[Timeout::SendRequest, Timeout::Await100], + Timeout::RecvResponse => &[Timeout::SendRequest, Timeout::SendBody], + Timeout::RecvBody => &[Timeout::RecvResponse], + _ => &[], + }; + + prev.iter().copied() + } + + /// All timeouts to check + fn timeouts_to_check(&self) -> impl Iterator { + // Always check Global and PerCall + self.preceeding().chain([Timeout::Global, Timeout::PerCall]) + } + + /// Get the corresponding configured timeout + fn configured_timeout(&self, timeouts: &Timeouts) -> Option { + match self { + Timeout::Global => timeouts.global, + Timeout::PerCall => timeouts.per_call, + Timeout::Resolve => timeouts.resolve, + Timeout::Connect => timeouts.connect, + Timeout::SendRequest => timeouts.send_request, + Timeout::Await100 => timeouts.await_100, + Timeout::SendBody => timeouts.send_body, + Timeout::RecvResponse => timeouts.recv_response, + Timeout::RecvBody => timeouts.recv_body, + } + .map(Into::into) + } +} + #[derive(Debug, Default)] pub(crate) struct CallTimings { - pub timeouts: Timeouts, - pub current_time: CurrentTime, - - pub time_global_start: Option, - pub time_call_start: Option, - pub time_resolve: Option, - pub time_connect: Option, - pub time_send_request: Option, - pub time_send_body: Option, - pub time_await_100: Option, - pub time_recv_response: Option, - pub time_recv_body: Option, + timeouts: Timeouts, + current_time: CurrentTime, + times: SmallVec<[(Timeout, Instant); 8]>, +} + +impl CallTimings { + pub(crate) fn new(timeouts: Timeouts, current_time: CurrentTime) -> Self { + let mut times = SmallVec::default(); + + let now = current_time.now(); + times.push((Timeout::Global, now)); + times.push((Timeout::PerCall, now)); + + CallTimings { + timeouts, + current_time, + times, + } + } + + pub(crate) fn new_call(mut self) -> CallTimings { + self.times.retain(|(t, _)| *t == Timeout::Global); + self.times.push((Timeout::PerCall, self.current_time.now())); + + CallTimings { + timeouts: self.timeouts, + current_time: self.current_time, + times: self.times, + } + } + + pub(crate) fn now(&self) -> Instant { + self.current_time.now() + } + + pub(crate) fn record_time(&mut self, timeout: Timeout) { + // Each time should only be recorded once + assert!( + self.time_of(timeout).is_none(), + "{:?} recorded more than once", + timeout + ); + + // There need to be at least one preceeding time recorded + // since it follows a graph/call tree. + let any_preceeding = timeout + .preceeding() + .filter_map(|to_check| self.time_of(to_check)) + .any(|_| true); + + assert!(any_preceeding, "{:?} has no preceeding", timeout); + + // Record the time + self.times.push((timeout, self.current_time.now())); + } + + fn time_of(&self, timeout: Timeout) -> Option { + self.times.iter().find(|x| x.0 == timeout).map(|x| x.1) + } + + pub(crate) fn next_timeout(&self, timeout: Timeout) -> NextTimeout { + let (reason, at) = timeout + .timeouts_to_check() + .filter_map(|to_check| { + let time = self.time_of(to_check)?; + let timeout = to_check.configured_timeout(&self.timeouts)?; + Some((to_check, time + timeout)) + }) + .min_by(|a, b| a.1.cmp(&b.1)) + .unwrap_or((Timeout::Global, Instant::NotHappening)); + + let now = self.now(); + let after = at.duration_since(now); + + NextTimeout { after, reason } + } } #[derive(Clone)] @@ -60,7 +171,7 @@ impl CurrentTime { } } -/// A pair of [`Duration`] and [`TimeoutReason`]. +/// A pair of [`Duration`] and [`Timeout`]. #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub struct NextTimeout { /// Duration until next timeout. @@ -81,126 +192,6 @@ impl NextTimeout { } } -impl CallTimings { - pub(crate) fn now(&self) -> Instant { - self.current_time.now() - } - - pub(crate) fn record_timeout(&mut self, reason: Timeout) { - match reason { - Timeout::Global => { - let now = self.now(); - if self.time_global_start.is_none() { - self.time_global_start = Some(now); - } - self.time_call_start = Some(now); - } - Timeout::Resolve => { - self.time_resolve = Some(self.now()); - } - Timeout::Connect => { - self.time_connect = Some(self.now()); - } - Timeout::SendRequest => { - self.time_send_request = Some(self.now()); - } - Timeout::SendBody => { - self.time_send_body = Some(self.now()); - } - Timeout::Await100 => { - self.time_await_100 = Some(self.now()); - } - Timeout::RecvResponse => { - self.time_recv_response = Some(self.now()); - } - Timeout::RecvBody => { - self.time_recv_body = Some(self.now()); - } - } - } - - pub(crate) fn next_timeout(&self, reason: Timeout) -> NextTimeout { - // self.time_xxx unwraps() below are OK. If the unwrap fails, we have a state - // bug where we progressed to a certain state without setting the corresponding time. - let timeouts = &self.timeouts; - - let expire_at = match reason { - Timeout::Global => timeouts - .global - .map(|t| self.time_global_start.unwrap() + t.into()), - Timeout::Resolve => timeouts - .resolve - .map(|t| self.time_call_start.unwrap() + t.into()), - Timeout::Connect => timeouts - .connect - .map(|t| self.time_resolve.unwrap() + t.into()), - Timeout::SendRequest => timeouts - .send_request - .map(|t| self.time_connect.unwrap() + t.into()), - Timeout::SendBody => timeouts - .send_body - .map(|t| self.time_send_request.unwrap() + t.into()), - Timeout::Await100 => timeouts - .await_100 - .map(|t| self.time_send_request.unwrap() + t.into()), - Timeout::RecvResponse => timeouts.recv_response.map(|t| { - // The fallback order is important. See state diagram in hoot. - self.time_send_body - .or(self.time_await_100) - .or(self.time_send_request) - .unwrap() - + t.into() - }), - Timeout::RecvBody => timeouts - .recv_body - .map(|t| self.time_recv_response.unwrap() + t.into()), - } - .unwrap_or(Instant::NotHappening); - - let global_at = self.global_timeout(); - - let (at, reason) = if global_at < expire_at { - (global_at, Timeout::Global) - } else { - (expire_at, reason) - }; - - let after = at.duration_since(self.now()); - - NextTimeout { after, reason } - } - - fn global_timeout(&self) -> Instant { - let global_start = self.time_global_start.unwrap(); - let call_start = self.time_call_start.unwrap(); - - let global_at = global_start - + self - .timeouts - .global - .map(|t| t.into()) - .unwrap_or(crate::transport::time::Duration::NotHappening); - - let call_at = call_start - + self - .timeouts - .per_call - .map(|t| t.into()) - .unwrap_or(crate::transport::time::Duration::NotHappening); - - global_at.min(call_at) - } - - pub(crate) fn new_call(self) -> CallTimings { - CallTimings { - timeouts: self.timeouts, - time_global_start: self.time_global_start, - current_time: self.current_time, - ..Default::default() - } - } -} - impl fmt::Debug for CurrentTime { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_tuple("CurrentTime").finish() @@ -217,8 +208,9 @@ impl fmt::Display for Timeout { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { let r = match self { Timeout::Global => "global", - Timeout::Resolve => "resolver", - Timeout::Connect => "open connection", + Timeout::PerCall => "per call", + Timeout::Resolve => "resolve", + Timeout::Connect => "connect", Timeout::SendRequest => "send request", Timeout::SendBody => "send body", Timeout::Await100 => "await 100", diff --git a/src/transport/time.rs b/src/transport/time.rs index be8ab114..21705fa8 100644 --- a/src/transport/time.rs +++ b/src/transport/time.rs @@ -1,7 +1,7 @@ //! Internal time wrappers use std::cmp::Ordering; -use std::ops::{Add, AddAssign, Deref}; +use std::ops::{Add, Deref}; use std::time; /// Wrapper for [`std::time::Instant`] that provides additional time points in the past or future @@ -89,12 +89,6 @@ impl Add for Instant { } } -impl AddAssign for Instant { - fn add_assign(&mut self, rhs: Duration) { - *self = (*self).add(rhs); - } -} - impl PartialOrd for Instant { fn partial_cmp(&self, other: &Self) -> Option { Some(Self::cmp(self, other))