diff --git a/Cargo.toml b/Cargo.toml index f227221289..10f67d95b2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -34,6 +34,7 @@ enum-map = { version = "2.7", default-features = false } log = { version = "0.4", default-features = false } qlog = { version = "0.13", default-features = false } quinn-udp = { version = "0.5.6", default-features = false, features = ["direct-log"] } +regex = { version = "1.9", default-features = false, features = ["unicode-perl"] } static_assertions = { version = "1.1", default-features = false } url = { version = "2.5", default-features = false, features = ["std"] } diff --git a/neqo-bin/Cargo.toml b/neqo-bin/Cargo.toml index 9cb2a73dfa..0920ba1693 100644 --- a/neqo-bin/Cargo.toml +++ b/neqo-bin/Cargo.toml @@ -39,7 +39,7 @@ neqo-transport = { path = "./../neqo-transport" } neqo-udp = { path = "./../neqo-udp" } qlog = { workspace = true } quinn-udp = { workspace = true } -regex = { version = "1.9", default-features = false, features = ["unicode-perl"] } +regex = { workspace = true } tokio = { version = "1", default-features = false, features = ["net", "time", "macros", "rt", "rt-multi-thread"] } url = { workspace = true } diff --git a/neqo-common/Cargo.toml b/neqo-common/Cargo.toml index c3fadaf522..8335a11c12 100644 --- a/neqo-common/Cargo.toml +++ b/neqo-common/Cargo.toml @@ -29,6 +29,7 @@ windows = { version = "0.58", default-features = false, features = ["Win32_Media [dev-dependencies] test-fixture = { path = "../test-fixture" } +regex = { workspace = true } [features] ci = [] diff --git a/neqo-common/src/qlog.rs b/neqo-common/src/qlog.rs index 04651b587b..34f1e1c2ca 100644 --- a/neqo-common/src/qlog.rs +++ b/neqo-common/src/qlog.rs @@ -11,7 +11,7 @@ use std::{ io::BufWriter, path::PathBuf, rc::Rc, - time::SystemTime, + time::{Instant, SystemTime}, }; use qlog::{ @@ -95,20 +95,41 @@ impl NeqoQlog { } /// If logging enabled, closure may generate an event to be logged. - pub fn add_event(&self, f: F) + pub fn add_event_with_instant(&self, f: F, now: Instant) where F: FnOnce() -> Option, { self.add_event_with_stream(|s| { if let Some(evt) = f() { - s.add_event(evt)?; + s.add_event_with_instant(evt, now)?; } Ok(()) }); } /// If logging enabled, closure may generate an event to be logged. - pub fn add_event_data(&self, f: F) + pub fn add_event_data_with_instant(&self, f: F, now: Instant) + where + F: FnOnce() -> Option, + { + self.add_event_with_stream(|s| { + if let Some(ev_data) = f() { + s.add_event_data_with_instant(ev_data, now)?; + } + Ok(()) + }); + } + + /// If logging enabled, closure may generate an event to be logged. + /// + /// This function is similar to [`NeqoQlog::add_event_data_with_instant`], + /// but it does not take `now: Instant` as an input parameter. Instead, it + /// internally calls [`std::time::Instant::now`]. Prefer calling + /// [`NeqoQlog::add_event_data_with_instant`] when `now` is available, as it + /// ensures consistency with the current time, which might differ from + /// [`std::time::Instant::now`] (e.g., when using simulated time instead of + /// real time). + pub fn add_event_data_now(&self, f: F) where F: FnOnce() -> Option, { @@ -184,7 +205,10 @@ pub fn new_trace(role: Role) -> qlog::TraceSeq { #[cfg(test)] mod test { + use std::time::Instant; + use qlog::events::Event; + use regex::Regex; use test_fixture::EXPECTED_LOG_HEADER; const EV_DATA: qlog::events::EventData = @@ -205,15 +229,14 @@ mod test { } #[test] - fn add_event() { + fn add_event_with_instant() { let (log, contents) = test_fixture::new_neqo_qlog(); - log.add_event(|| Some(Event::with_time(1.1, EV_DATA))); + log.add_event_with_instant(|| Some(Event::with_time(0.0, EV_DATA)), Instant::now()); assert_eq!( - contents.to_string(), - format!( - "{EXPECTED_LOG_HEADER}{e}", - e = EXPECTED_LOG_EVENT.replace("\"time\":0.0,", "\"time\":1.1,") - ) + Regex::new("\"time\":[0-9].[0-9]*,") + .unwrap() + .replace(&contents.to_string(), "\"time\":0.0,"), + format!("{EXPECTED_LOG_HEADER}{EXPECTED_LOG_EVENT}"), ); } } diff --git a/neqo-http3/src/qlog.rs b/neqo-http3/src/qlog.rs index 9e2742e159..0a21697923 100644 --- a/neqo-http3/src/qlog.rs +++ b/neqo-http3/src/qlog.rs @@ -10,8 +10,11 @@ use neqo_common::qlog::NeqoQlog; use neqo_transport::StreamId; use qlog::events::{DataRecipient, EventData}; +/// Uses [`NeqoQlog::add_event_data_now`] instead of +/// [`NeqoQlog::add_event_data_with_instant`], given that `now` is not available +/// on call-site. See docs on [`NeqoQlog::add_event_data_now`] for details. pub fn h3_data_moved_up(qlog: &NeqoQlog, stream_id: StreamId, amount: usize) { - qlog.add_event_data(|| { + qlog.add_event_data_now(|| { let ev_data = EventData::DataMoved(qlog::events::quic::DataMoved { stream_id: Some(stream_id.as_u64()), offset: None, @@ -25,8 +28,11 @@ pub fn h3_data_moved_up(qlog: &NeqoQlog, stream_id: StreamId, amount: usize) { }); } +/// Uses [`NeqoQlog::add_event_data_now`] instead of +/// [`NeqoQlog::add_event_data_with_instant`], given that `now` is not available +/// on call-site. See docs on [`NeqoQlog::add_event_data_now`] for details. pub fn h3_data_moved_down(qlog: &NeqoQlog, stream_id: StreamId, amount: usize) { - qlog.add_event_data(|| { + qlog.add_event_data_now(|| { let ev_data = EventData::DataMoved(qlog::events::quic::DataMoved { stream_id: Some(stream_id.as_u64()), offset: None, diff --git a/neqo-qpack/src/qlog.rs b/neqo-qpack/src/qlog.rs index a91b9875e2..21c9b8d6aa 100644 --- a/neqo-qpack/src/qlog.rs +++ b/neqo-qpack/src/qlog.rs @@ -12,8 +12,11 @@ use qlog::events::{ EventData, RawInfo, }; +/// Uses [`NeqoQlog::add_event_data_now`] instead of +/// [`NeqoQlog::add_event_data_with_instant`], given that `now` is not available +/// on call-site. See docs on [`NeqoQlog::add_event_data_now`] for details. pub fn qpack_read_insert_count_increment_instruction(qlog: &NeqoQlog, increment: u64, data: &[u8]) { - qlog.add_event_data(|| { + qlog.add_event_data_now(|| { let raw = RawInfo { length: Some(8), payload_length: None, diff --git a/neqo-transport/src/cc/classic_cc.rs b/neqo-transport/src/cc/classic_cc.rs index 1130178bc0..0c629afb59 100644 --- a/neqo-transport/src/cc/classic_cc.rs +++ b/neqo-transport/src/cc/classic_cc.rs @@ -216,8 +216,8 @@ impl CongestionControl for ClassicCongestionControl { } if self.state.in_recovery() { - self.set_state(State::CongestionAvoidance); - qlog::metrics_updated(&self.qlog, &[QlogMetric::InRecovery(false)]); + self.set_state(State::CongestionAvoidance, now); + qlog::metrics_updated(&self.qlog, &[QlogMetric::InRecovery(false)], now); } new_acked += pkt.len(); @@ -239,7 +239,7 @@ impl CongestionControl for ClassicCongestionControl { if self.congestion_window == self.ssthresh { // This doesn't look like it is necessary, but it can happen // after persistent congestion. - self.set_state(State::CongestionAvoidance); + self.set_state(State::CongestionAvoidance, now); } } // Congestion avoidance, above the slow start threshold. @@ -276,6 +276,7 @@ impl CongestionControl for ClassicCongestionControl { QlogMetric::CongestionWindow(self.congestion_window), QlogMetric::BytesInFlight(self.bytes_in_flight), ], + now, ); qdebug!([self], "on_packets_acked this={:p}, limited=0, bytes_in_flight={}, cwnd={}, state={:?}, new_acked={}", self, self.bytes_in_flight, self.congestion_window, self.state, new_acked); } @@ -287,6 +288,7 @@ impl CongestionControl for ClassicCongestionControl { prev_largest_acked_sent: Option, pto: Duration, lost_packets: &[SentPacket], + now: Instant, ) -> bool { if lost_packets.is_empty() { return false; @@ -306,6 +308,7 @@ impl CongestionControl for ClassicCongestionControl { qlog::metrics_updated( &self.qlog, &[QlogMetric::BytesInFlight(self.bytes_in_flight)], + now, ); let is_pmtud_probe = self.pmtud.is_probe_filter(); @@ -320,12 +323,13 @@ impl CongestionControl for ClassicCongestionControl { return false; }; - let congestion = self.on_congestion_event(last_lost_packet); + let congestion = self.on_congestion_event(last_lost_packet, now); let persistent_congestion = self.detect_persistent_congestion( first_rtt_sample_time, prev_largest_acked_sent, pto, lost_packets.rev(), + now, ); qdebug!( "on_packets_lost this={:p}, bytes_in_flight={}, cwnd={}, state={:?}", @@ -341,31 +345,33 @@ impl CongestionControl for ClassicCongestionControl { /// congestion event. /// /// See . - fn on_ecn_ce_received(&mut self, largest_acked_pkt: &SentPacket) -> bool { - self.on_congestion_event(largest_acked_pkt) + fn on_ecn_ce_received(&mut self, largest_acked_pkt: &SentPacket, now: Instant) -> bool { + self.on_congestion_event(largest_acked_pkt, now) } - fn discard(&mut self, pkt: &SentPacket) { + fn discard(&mut self, pkt: &SentPacket, now: Instant) { if pkt.cc_outstanding() { assert!(self.bytes_in_flight >= pkt.len()); self.bytes_in_flight -= pkt.len(); qlog::metrics_updated( &self.qlog, &[QlogMetric::BytesInFlight(self.bytes_in_flight)], + now, ); qtrace!([self], "Ignore pkt with size {}", pkt.len()); } } - fn discard_in_flight(&mut self) { + fn discard_in_flight(&mut self, now: Instant) { self.bytes_in_flight = 0; qlog::metrics_updated( &self.qlog, &[QlogMetric::BytesInFlight(self.bytes_in_flight)], + now, ); } - fn on_packet_sent(&mut self, pkt: &SentPacket) { + fn on_packet_sent(&mut self, pkt: &SentPacket, now: Instant) { // Record the recovery time and exit any transient state. if self.state.transient() { self.recovery_start = Some(pkt.pn()); @@ -393,6 +399,7 @@ impl CongestionControl for ClassicCongestionControl { qlog::metrics_updated( &self.qlog, &[QlogMetric::BytesInFlight(self.bytes_in_flight)], + now, ); } @@ -448,23 +455,26 @@ impl ClassicCongestionControl { self.acked_bytes } - fn set_state(&mut self, state: State) { + fn set_state(&mut self, state: State, now: Instant) { if self.state != state { qdebug!([self], "state -> {:?}", state); let old_state = self.state; - self.qlog.add_event_data(|| { - // No need to tell qlog about exit from transient states. - if old_state.transient() { - None - } else { - let ev_data = EventData::CongestionStateUpdated(CongestionStateUpdated { - old: Some(old_state.to_qlog().to_owned()), - new: state.to_qlog().to_owned(), - trigger: None, - }); - Some(ev_data) - } - }); + self.qlog.add_event_data_with_instant( + || { + // No need to tell qlog about exit from transient states. + if old_state.transient() { + None + } else { + let ev_data = EventData::CongestionStateUpdated(CongestionStateUpdated { + old: Some(old_state.to_qlog().to_owned()), + new: state.to_qlog().to_owned(), + trigger: None, + }); + Some(ev_data) + } + }, + now, + ); self.state = state; } } @@ -475,6 +485,7 @@ impl ClassicCongestionControl { prev_largest_acked_sent: Option, pto: Duration, lost_packets: impl IntoIterator, + now: Instant, ) -> bool { if first_rtt_sample_time.is_none() { return false; @@ -512,10 +523,11 @@ impl ClassicCongestionControl { qinfo!([self], "persistent congestion"); self.congestion_window = self.cwnd_min(); self.acked_bytes = 0; - self.set_state(State::PersistentCongestion); + self.set_state(State::PersistentCongestion, now); qlog::metrics_updated( &self.qlog, &[QlogMetric::CongestionWindow(self.congestion_window)], + now, ); return true; } @@ -539,7 +551,7 @@ impl ClassicCongestionControl { /// Handle a congestion event. /// Returns true if this was a true congestion event. - fn on_congestion_event(&mut self, last_packet: &SentPacket) -> bool { + fn on_congestion_event(&mut self, last_packet: &SentPacket, now: Instant) -> bool { // Start a new congestion event if lost or ECN CE marked packet was sent // after the start of the previous congestion recovery period. if !self.after_recovery_start(last_packet) { @@ -567,8 +579,9 @@ impl ClassicCongestionControl { QlogMetric::SsThresh(self.ssthresh), QlogMetric::InRecovery(true), ], + now, ); - self.set_state(State::RecoveryStart); + self.set_state(State::RecoveryStart, now); true } @@ -668,10 +681,10 @@ mod tests { persistent_expected: bool, ) { for p in lost_packets { - cc.on_packet_sent(p); + cc.on_packet_sent(p, now()); } - cc.on_packets_lost(Some(now()), None, PTO, lost_packets); + cc.on_packets_lost(Some(now()), None, PTO, lost_packets, Instant::now()); let persistent = if cc.cwnd() == reduced_cwnd { false @@ -874,18 +887,19 @@ mod tests { rtt_time: u32, lost: &[SentPacket], ) -> bool { + let now = Instant::now(); assert_eq!(cc.cwnd(), cc.cwnd_initial()); let last_ack = Some(by_pto(last_ack)); let rtt_time = Some(by_pto(rtt_time)); // Persistent congestion is never declared if the RTT time is `None`. - cc.detect_persistent_congestion(None, None, PTO, lost.iter()); + cc.detect_persistent_congestion(None, None, PTO, lost.iter(), now); assert_eq!(cc.cwnd(), cc.cwnd_initial()); - cc.detect_persistent_congestion(None, last_ack, PTO, lost.iter()); + cc.detect_persistent_congestion(None, last_ack, PTO, lost.iter(), now); assert_eq!(cc.cwnd(), cc.cwnd_initial()); - cc.detect_persistent_congestion(rtt_time, last_ack, PTO, lost.iter()); + cc.detect_persistent_congestion(rtt_time, last_ack, PTO, lost.iter(), now); cc.cwnd() == cc.cwnd_min() } @@ -1023,7 +1037,7 @@ mod tests { fn persistent_congestion_no_prev_ack_newreno() { let lost = make_lost(&[1, PERSISTENT_CONG_THRESH + 2]); let mut cc = ClassicCongestionControl::new(NewReno::default(), Pmtud::new(IP_ADDR)); - cc.detect_persistent_congestion(Some(by_pto(0)), None, PTO, lost.iter()); + cc.detect_persistent_congestion(Some(by_pto(0)), None, PTO, lost.iter(), Instant::now()); assert_eq!(cc.cwnd(), cc.cwnd_min()); } @@ -1031,7 +1045,7 @@ mod tests { fn persistent_congestion_no_prev_ack_cubic() { let lost = make_lost(&[1, PERSISTENT_CONG_THRESH + 2]); let mut cc = ClassicCongestionControl::new(Cubic::default(), Pmtud::new(IP_ADDR)); - cc.detect_persistent_congestion(Some(by_pto(0)), None, PTO, lost.iter()); + cc.detect_persistent_congestion(Some(by_pto(0)), None, PTO, lost.iter(), Instant::now()); assert_eq!(cc.cwnd(), cc.cwnd_min()); } @@ -1085,7 +1099,7 @@ mod tests { cc.max_datagram_size(), ); next_pn += 1; - cc.on_packet_sent(&p); + cc.on_packet_sent(&p, now); pkts.push(p); } assert_eq!( @@ -1113,7 +1127,7 @@ mod tests { cc.max_datagram_size(), ); next_pn += 1; - cc.on_packet_sent(&p); + cc.on_packet_sent(&p, now); pkts.push(p); } assert_eq!( @@ -1163,10 +1177,10 @@ mod tests { Vec::new(), cc.max_datagram_size(), ); - cc.on_packet_sent(&p_lost); + cc.on_packet_sent(&p_lost, now); cwnd_is_default(&cc); now += PTO; - cc.on_packets_lost(Some(now), None, PTO, &[p_lost]); + cc.on_packets_lost(Some(now), None, PTO, &[p_lost], now); cwnd_is_halved(&cc); let p_not_lost = SentPacket::new( PacketType::Short, @@ -1177,7 +1191,7 @@ mod tests { Vec::new(), cc.max_datagram_size(), ); - cc.on_packet_sent(&p_not_lost); + cc.on_packet_sent(&p_not_lost, now); now += RTT; cc.on_packets_acked(&[p_not_lost], &RTT_ESTIMATE, now); cwnd_is_halved(&cc); @@ -1202,7 +1216,7 @@ mod tests { cc.max_datagram_size(), ); next_pn += 1; - cc.on_packet_sent(&p); + cc.on_packet_sent(&p, now); pkts.push(p); } assert_eq!( @@ -1236,7 +1250,7 @@ mod tests { cc.max_datagram_size(), ); next_pn += 1; - cc.on_packet_sent(&p); + cc.on_packet_sent(&p, now); pkts.push(p); } assert_eq!( @@ -1264,22 +1278,23 @@ mod tests { #[test] fn ecn_ce() { + let now = now(); let mut cc = ClassicCongestionControl::new(NewReno::default(), Pmtud::new(IP_ADDR)); let p_ce = SentPacket::new( PacketType::Short, 1, IpTosEcn::default(), - now(), + now, true, Vec::new(), cc.max_datagram_size(), ); - cc.on_packet_sent(&p_ce); + cc.on_packet_sent(&p_ce, now); cwnd_is_default(&cc); assert_eq!(cc.state, State::SlowStart); // Signal congestion (ECN CE) and thus change state to recovery start. - cc.on_ecn_ce_received(&p_ce); + cc.on_ecn_ce_received(&p_ce, now); cwnd_is_halved(&cc); assert_eq!(cc.state, State::RecoveryStart); } diff --git a/neqo-transport/src/cc/mod.rs b/neqo-transport/src/cc/mod.rs index bbb47c4fd0..6c862c146f 100644 --- a/neqo-transport/src/cc/mod.rs +++ b/neqo-transport/src/cc/mod.rs @@ -60,19 +60,20 @@ pub trait CongestionControl: Display + Debug { prev_largest_acked_sent: Option, pto: Duration, lost_packets: &[SentPacket], + now: Instant, ) -> bool; /// Returns true if the congestion window was reduced. - fn on_ecn_ce_received(&mut self, largest_acked_pkt: &SentPacket) -> bool; + fn on_ecn_ce_received(&mut self, largest_acked_pkt: &SentPacket, now: Instant) -> bool; #[must_use] fn recovery_packet(&self) -> bool; - fn discard(&mut self, pkt: &SentPacket); + fn discard(&mut self, pkt: &SentPacket, now: Instant); - fn on_packet_sent(&mut self, pkt: &SentPacket); + fn on_packet_sent(&mut self, pkt: &SentPacket, now: Instant); - fn discard_in_flight(&mut self); + fn discard_in_flight(&mut self, now: Instant); } #[derive(Debug, Copy, Clone)] diff --git a/neqo-transport/src/cc/tests/cubic.rs b/neqo-transport/src/cc/tests/cubic.rs index 54c8c2c3c8..e62c063b90 100644 --- a/neqo-transport/src/cc/tests/cubic.rs +++ b/neqo-transport/src/cc/tests/cubic.rs @@ -53,7 +53,7 @@ fn fill_cwnd(cc: &mut ClassicCongestionControl, mut next_pn: u64, now: In Vec::new(), cc.max_datagram_size(), ); - cc.on_packet_sent(&sent); + cc.on_packet_sent(&sent, now); next_pn += 1; } next_pn @@ -83,7 +83,7 @@ fn packet_lost(cc: &mut ClassicCongestionControl, pn: u64) { Vec::new(), cc.max_datagram_size(), ); - cc.on_packets_lost(None, None, PTO, &[p_lost]); + cc.on_packets_lost(None, None, PTO, &[p_lost], now()); } fn expected_tcp_acks(cwnd_rtt_start: usize, mtu: usize) -> u64 { diff --git a/neqo-transport/src/cc/tests/new_reno.rs b/neqo-transport/src/cc/tests/new_reno.rs index 1ee8c74f67..0b4e64af55 100644 --- a/neqo-transport/src/cc/tests/new_reno.rs +++ b/neqo-transport/src/cc/tests/new_reno.rs @@ -38,18 +38,19 @@ fn cwnd_is_halved(cc: &ClassicCongestionControl) { } #[test] +#[allow(clippy::too_many_lines)] fn issue_876() { let mut cc = ClassicCongestionControl::new(NewReno::default(), Pmtud::new(IP_ADDR)); - let time_now = now(); - let time_before = time_now.checked_sub(Duration::from_millis(100)).unwrap(); - let time_after = time_now + Duration::from_millis(150); + let now = now(); + let before = now.checked_sub(Duration::from_millis(100)).unwrap(); + let after = now + Duration::from_millis(150); let sent_packets = &[ SentPacket::new( PacketType::Short, 1, IpTosEcn::default(), - time_before, + before, true, Vec::new(), cc.max_datagram_size() - 1, @@ -58,7 +59,7 @@ fn issue_876() { PacketType::Short, 2, IpTosEcn::default(), - time_before, + before, true, Vec::new(), cc.max_datagram_size() - 2, @@ -67,7 +68,7 @@ fn issue_876() { PacketType::Short, 3, IpTosEcn::default(), - time_before, + before, true, Vec::new(), cc.max_datagram_size(), @@ -76,7 +77,7 @@ fn issue_876() { PacketType::Short, 4, IpTosEcn::default(), - time_before, + before, true, Vec::new(), cc.max_datagram_size(), @@ -85,7 +86,7 @@ fn issue_876() { PacketType::Short, 5, IpTosEcn::default(), - time_before, + before, true, Vec::new(), cc.max_datagram_size(), @@ -94,7 +95,7 @@ fn issue_876() { PacketType::Short, 6, IpTosEcn::default(), - time_before, + before, true, Vec::new(), cc.max_datagram_size(), @@ -103,7 +104,7 @@ fn issue_876() { PacketType::Short, 7, IpTosEcn::default(), - time_after, + after, true, Vec::new(), cc.max_datagram_size() - 3, @@ -112,13 +113,13 @@ fn issue_876() { // Send some more packets so that the cc is not app-limited. for p in &sent_packets[..6] { - cc.on_packet_sent(p); + cc.on_packet_sent(p, now); } assert_eq!(cc.acked_bytes(), 0); cwnd_is_default(&cc); assert_eq!(cc.bytes_in_flight(), 6 * cc.max_datagram_size() - 3); - cc.on_packets_lost(Some(time_now), None, PTO, &sent_packets[0..1]); + cc.on_packets_lost(Some(now), None, PTO, &sent_packets[0..1], now); // We are now in recovery assert!(cc.recovery_packet()); @@ -127,20 +128,20 @@ fn issue_876() { assert_eq!(cc.bytes_in_flight(), 5 * cc.max_datagram_size() - 2); // Send a packet after recovery starts - cc.on_packet_sent(&sent_packets[6]); + cc.on_packet_sent(&sent_packets[6], now); assert!(!cc.recovery_packet()); cwnd_is_halved(&cc); assert_eq!(cc.acked_bytes(), 0); assert_eq!(cc.bytes_in_flight(), 6 * cc.max_datagram_size() - 5); // and ack it. cwnd increases slightly - cc.on_packets_acked(&sent_packets[6..], &RTT_ESTIMATE, time_now); + cc.on_packets_acked(&sent_packets[6..], &RTT_ESTIMATE, now); assert_eq!(cc.acked_bytes(), sent_packets[6].len()); cwnd_is_halved(&cc); assert_eq!(cc.bytes_in_flight(), 5 * cc.max_datagram_size() - 2); // Packet from before is lost. Should not hurt cwnd. - cc.on_packets_lost(Some(time_now), None, PTO, &sent_packets[1..2]); + cc.on_packets_lost(Some(now), None, PTO, &sent_packets[1..2], now); assert!(!cc.recovery_packet()); assert_eq!(cc.acked_bytes(), sent_packets[6].len()); cwnd_is_halved(&cc); @@ -169,7 +170,7 @@ fn issue_1465() { }; let mut send_next = |cc: &mut ClassicCongestionControl, now| { let p = next_packet(now); - cc.on_packet_sent(&p); + cc.on_packet_sent(&p, now); p }; @@ -184,7 +185,7 @@ fn issue_1465() { // advance one rtt to detect lost packet there this simplifies the timers, because // on_packet_loss would only be called after RTO, but that is not relevant to the problem now += RTT; - cc.on_packets_lost(Some(now), None, PTO, &[p1]); + cc.on_packets_lost(Some(now), None, PTO, &[p1], now); // We are now in recovery assert!(cc.recovery_packet()); @@ -193,7 +194,7 @@ fn issue_1465() { assert_eq!(cc.bytes_in_flight(), 2 * cc.max_datagram_size()); // Don't reduce the cwnd again on second packet loss - cc.on_packets_lost(Some(now), None, PTO, &[p3]); + cc.on_packets_lost(Some(now), None, PTO, &[p3], now); assert_eq!(cc.acked_bytes(), 0); cwnd_is_halved(&cc); // still the same as after first packet loss assert_eq!(cc.bytes_in_flight(), cc.max_datagram_size()); @@ -206,7 +207,7 @@ fn issue_1465() { // send out recovery packet and get it acked to get out of recovery state let p4 = send_next(&mut cc, now); - cc.on_packet_sent(&p4); + cc.on_packet_sent(&p4, now); now += RTT; cc.on_packets_acked(&[p4], &RTT_ESTIMATE, now); @@ -216,7 +217,7 @@ fn issue_1465() { now += RTT; let cur_cwnd = cc.cwnd(); - cc.on_packets_lost(Some(now), None, PTO, &[p5]); + cc.on_packets_lost(Some(now), None, PTO, &[p5], now); // go back into recovery assert!(cc.recovery_packet()); @@ -225,6 +226,6 @@ fn issue_1465() { assert_eq!(cc.bytes_in_flight(), 2 * cc.max_datagram_size()); // this shouldn't introduce further cwnd reduction, but it did before https://github.com/mozilla/neqo/pull/1465 - cc.on_packets_lost(Some(now), None, PTO, &[p6]); + cc.on_packets_lost(Some(now), None, PTO, &[p6], now); assert_eq!(cc.cwnd(), cur_cwnd / 2); } diff --git a/neqo-transport/src/connection/mod.rs b/neqo-transport/src/connection/mod.rs index b6b6b15762..94986b9b68 100644 --- a/neqo-transport/src/connection/mod.rs +++ b/neqo-transport/src/connection/mod.rs @@ -921,7 +921,7 @@ impl Connection { State::Init => { // We have not even sent anything just close the connection without sending any // error. This may happen when client_start fails. - self.set_state(State::Closed(error)); + self.set_state(State::Closed(error), now); } State::WaitInitial => { // We don't have any state yet, so don't bother with @@ -930,22 +930,25 @@ impl Connection { self.state_signaling .close(path, error.clone(), frame_type, msg); } - self.set_state(State::Closed(error)); + self.set_state(State::Closed(error), now); } _ => { if let Some(path) = path.or_else(|| self.paths.primary()) { self.state_signaling .close(path, error.clone(), frame_type, msg); if matches!(v, Error::KeysExhausted) { - self.set_state(State::Closed(error)); + self.set_state(State::Closed(error), now); } else { - self.set_state(State::Closing { - error, - timeout: self.get_closing_period_time(now), - }); + self.set_state( + State::Closing { + error, + timeout: self.get_closing_period_time(now), + }, + now, + ); } } else { - self.set_state(State::Closed(error)); + self.set_state(State::Closed(error), now); } } } @@ -967,7 +970,7 @@ impl Connection { State::Closing { error, timeout } | State::Draining { error, timeout } => { if *timeout <= now { let st = State::Closed(error.clone()); - self.set_state(st); + self.set_state(st, now); qinfo!("Closing timer expired"); return; } @@ -982,7 +985,10 @@ impl Connection { let pto = self.pto(); if self.idle_timeout.expired(now, pto) { qinfo!([self], "idle timeout expired"); - self.set_state(State::Closed(CloseReason::Transport(Error::IdleTimeout))); + self.set_state( + State::Closed(CloseReason::Transport(Error::IdleTimeout)), + now, + ); return; } @@ -999,7 +1005,7 @@ impl Connection { if let Some(path) = self.paths.primary() { let lost = self.loss_recovery.timeout(&path, now); self.handle_lost_packets(&lost); - qlog::packets_lost(&self.qlog, &lost); + qlog::packets_lost(&self.qlog, &lost, now); } if self.release_resumption_token_timer.is_some() { @@ -1264,10 +1270,13 @@ impl Connection { // indicate that there is a stateless reset present. qdebug!([self], "Stateless reset: {}", hex(&d[d.len() - 16..])); self.state_signaling.reset(); - self.set_state(State::Draining { - error: CloseReason::Transport(Error::StatelessReset), - timeout: self.get_closing_period_time(now), - }); + self.set_state( + State::Draining { + error: CloseReason::Transport(Error::StatelessReset), + timeout: self.get_closing_period_time(now), + }, + now, + ); Err(Error::StatelessReset) } else { Ok(()) @@ -1339,14 +1348,16 @@ impl Connection { self.conn_params.get_versions().all(), supported, version, + now, ); Ok(()) } else { qinfo!([self], "Version negotiation: failed with {:?}", supported); // This error goes straight to closed. - self.set_state(State::Closed(CloseReason::Transport( - Error::VersionNegotiation, - ))); + self.set_state( + State::Closed(CloseReason::Transport(Error::VersionNegotiation)), + now, + ); Err(Error::VersionNegotiation) } } @@ -1399,7 +1410,7 @@ impl Connection { let dcid = ConnectionId::from(packet.dcid()); self.crypto.states.init_server(version, &dcid)?; self.original_destination_cid = Some(dcid); - self.set_state(State::WaitInitial); + self.set_state(State::WaitInitial, now); // We need to make sure that we set this transport parameter. // This has to happen prior to processing the packet so that @@ -1606,7 +1617,7 @@ impl Connection { neqo_common::write_item_to_fuzzing_corpus(target, &payload[..]); } - qlog::packet_received(&self.qlog, &packet, &payload); + qlog::packet_received(&self.qlog, &packet, &payload, now); let space = PacketNumberSpace::from(payload.packet_type()); if let Some(space) = self.acks.get_mut(space) { if space.is_duplicate(payload.pn()) { @@ -1659,7 +1670,7 @@ impl Connection { // the rest of the datagram on the floor, but don't generate an error. self.check_stateless_reset(path, &d, dcid.is_none(), now)?; self.stats.borrow_mut().pkt_dropped("Decryption failure"); - qlog::packet_dropped(&self.qlog, &packet); + qlog::packet_dropped(&self.qlog, &packet, now); } } slc = remainder; @@ -1759,6 +1770,7 @@ impl Connection { .unwrap() .clone(), ), + now, ); if self.role == Role::Client { path.borrow_mut().set_valid(now); @@ -1766,13 +1778,13 @@ impl Connection { } /// If the path isn't permanent, assign it a connection ID to make it so. - fn ensure_permanent(&mut self, path: &PathRef) -> Res<()> { + fn ensure_permanent(&mut self, path: &PathRef, now: Instant) -> Res<()> { if self.paths.is_temporary(path) { // If there isn't a connection ID to use for this path, the packet // will be processed, but it won't be attributed to a path. That means // no path probes or PATH_RESPONSE. But it's not fatal. if let Some(cid) = self.connection_ids.next() { - self.paths.make_permanent(path, None, cid); + self.paths.make_permanent(path, None, cid, now); Ok(()) } else if let Some(primary) = self.paths.primary() { if primary @@ -1781,7 +1793,7 @@ impl Connection { .map_or(true, |id| id.is_empty()) { self.paths - .make_permanent(path, None, ConnectionIdEntry::empty_remote()); + .make_permanent(path, None, ConnectionIdEntry::empty_remote(), now); Ok(()) } else { qtrace!([self], "Unable to make path permanent: {}", path.borrow()); @@ -1808,7 +1820,7 @@ impl Connection { self.setup_handshake_path(path, now); } else { // Otherwise try to get a usable connection ID. - mem::drop(self.ensure_permanent(path)); + mem::drop(self.ensure_permanent(path, now)); } } } @@ -1843,9 +1855,9 @@ impl Connection { self.stats.borrow().frame_rx.crypto > 0 }; if got_version { - self.set_state(State::Handshaking); + self.set_state(State::Handshaking, now); } else { - self.set_state(State::WaitVersion); + self.set_state(State::WaitVersion, now); } } @@ -1905,7 +1917,7 @@ impl Connection { self.conn_params.pacing_enabled(), now, ); - self.ensure_permanent(&path)?; + self.ensure_permanent(&path, now)?; qinfo!( [self], "Migrate to {} probe {}", @@ -1981,7 +1993,7 @@ impl Connection { return; } - if self.ensure_permanent(path).is_ok() { + if self.ensure_permanent(path, now).is_ok() { self.paths .handle_migration(path, d.source(), now, &mut self.stats.borrow_mut()); } else { @@ -2441,6 +2453,7 @@ impl Connection { pn, builder.len() - header_start + aead_expansion, &builder.as_ref()[payload_start..], + now, ); self.stats.borrow_mut().packets_tx += 1; @@ -2466,7 +2479,7 @@ impl Connection { ); if padded { needs_padding = false; - self.loss_recovery.on_packet_sent(path, sent); + self.loss_recovery.on_packet_sent(path, sent, now); } else if pt == PacketType::Initial && (self.role == Role::Client || ack_eliciting) { // Packets containing Initial packets might need padding, and we want to // track that padding along with the Initial packet. So defer tracking. @@ -2476,7 +2489,7 @@ impl Connection { if pt == PacketType::Handshake && self.role == Role::Client { needs_padding = false; } - self.loss_recovery.on_packet_sent(path, sent); + self.loss_recovery.on_packet_sent(path, sent, now); } if *space == PacketNumberSpace::Handshake @@ -2508,7 +2521,7 @@ impl Connection { // packet, which is why we don't increase `frame_tx.padding` count here. packets.resize(profile.limit(), 0); } - self.loss_recovery.on_packet_sent(path, initial); + self.loss_recovery.on_packet_sent(path, initial, now); } path.borrow_mut().add_sent(packets.len()); Ok(SendOption::Yes( @@ -2542,12 +2555,16 @@ impl Connection { qdebug!([self], "client_start"); debug_assert_eq!(self.role, Role::Client); if let Some(path) = self.paths.primary() { - qlog::client_connection_started(&self.qlog, &path); + qlog::client_connection_started(&self.qlog, &path, now); } - qlog::client_version_information_initiated(&self.qlog, self.conn_params.get_versions()); + qlog::client_version_information_initiated( + &self.qlog, + self.conn_params.get_versions(), + now, + ); self.handshake(now, self.version, PacketNumberSpace::Initial, None)?; - self.set_state(State::WaitInitial); + self.set_state(State::WaitInitial, now); self.zero_rtt_state = if self.crypto.enable_0rtt(self.version, self.role)? { qdebug!([self], "Enabled 0-RTT"); ZeroRttState::Sending @@ -2568,9 +2585,9 @@ impl Connection { let timeout = self.get_closing_period_time(now); if let Some(path) = self.paths.primary() { self.state_signaling.close(path, error.clone(), 0, msg); - self.set_state(State::Closing { error, timeout }); + self.set_state(State::Closing { error, timeout }, now); } else { - self.set_state(State::Closed(error)); + self.set_state(State::Closed(error), now); } } @@ -2600,7 +2617,7 @@ impl Connection { } /// Process the final set of transport parameters. - fn process_tps(&mut self) -> Res<()> { + fn process_tps(&mut self, now: Instant) -> Res<()> { self.validate_cids()?; self.validate_versions()?; { @@ -2645,7 +2662,7 @@ impl Connection { self.cid_manager.set_limit(max_active_cids); } self.set_initial_limits(); - qlog::connection_tparams_set(&self.qlog, &self.tps.borrow()); + qlog::connection_tparams_set(&self.qlog, &self.tps.borrow(), now); Ok(()) } @@ -2848,8 +2865,8 @@ impl Connection { Ok(()) } - fn set_confirmed(&mut self) -> Res<()> { - self.set_state(State::Confirmed); + fn set_confirmed(&mut self, now: Instant) -> Res<()> { + self.set_state(State::Confirmed, now); if self.conn_params.pmtud_enabled() { self.paths .primary() @@ -2970,7 +2987,7 @@ impl Connection { self.stats.borrow_mut().frame_rx.path_challenge += 1; // If we were challenged, try to make the path permanent. // Report an error if we don't have enough connection IDs. - self.ensure_permanent(path)?; + self.ensure_permanent(path, now)?; path.borrow_mut().challenged(data); } Frame::PathResponse { data } => { @@ -3012,17 +3029,20 @@ impl Connection { let error = CloseReason::Transport(detail); self.state_signaling .drain(Rc::clone(path), error.clone(), frame_type, ""); - self.set_state(State::Draining { - error, - timeout: self.get_closing_period_time(now), - }); + self.set_state( + State::Draining { + error, + timeout: self.get_closing_period_time(now), + }, + now, + ); } Frame::HandshakeDone => { self.stats.borrow_mut().frame_rx.handshake_done += 1; if self.role == Role::Server || !self.state.connected() { return Err(Error::ProtocolViolation); } - self.set_confirmed()?; + self.set_confirmed(now)?; self.discard_keys(PacketNumberSpace::Handshake, now); self.migrate_to_preferred_address(now)?; } @@ -3140,7 +3160,7 @@ impl Connection { } } self.handle_lost_packets(&lost_packets); - qlog::packets_lost(&self.qlog, &lost_packets); + qlog::packets_lost(&self.qlog, &lost_packets, now); let stats = &mut self.stats.borrow_mut().frame_rx; stats.ack += 1; if let Some(largest_acknowledged) = largest_acknowledged { @@ -3182,7 +3202,7 @@ impl Connection { let path = self.paths.primary().ok_or(Error::NoAvailablePath)?; path.borrow_mut().set_valid(now); // Generate a qlog event that the server connection started. - qlog::server_connection_started(&self.qlog, &path); + qlog::server_connection_started(&self.qlog, &path, now); } else { self.zero_rtt_state = if self .crypto @@ -3202,8 +3222,8 @@ impl Connection { let pto = self.pto(); self.crypto .install_application_keys(self.version, now + pto)?; - self.process_tps()?; - self.set_state(State::Connected); + self.process_tps(now)?; + self.set_state(State::Connected, now); self.create_resumption_token(now); self.saved_datagrams .make_available(CryptoSpace::ApplicationData); @@ -3215,13 +3235,13 @@ impl Connection { .resumed(); if self.role == Role::Server { self.state_signaling.handshake_done(); - self.set_confirmed()?; + self.set_confirmed(now)?; } qinfo!([self], "Connection established"); Ok(()) } - fn set_state(&mut self, state: State) { + fn set_state(&mut self, state: State, now: Instant) { if state > self.state { qdebug!([self], "State change from {:?} -> {:?}", self.state, state); self.state = state.clone(); @@ -3229,7 +3249,7 @@ impl Connection { self.streams.clear_streams(); } self.events.connection_state_change(state); - qlog::connection_state_updated(&self.qlog, &self.state); + qlog::connection_state_updated(&self.qlog, &self.state, now); } else if mem::discriminant(&state) != mem::discriminant(&self.state) { // Only tolerate a regression in state if the new state is closing // and the connection is already closed. diff --git a/neqo-transport/src/path.rs b/neqo-transport/src/path.rs index 1f94b3205e..50740444d9 100644 --- a/neqo-transport/src/path.rs +++ b/neqo-transport/src/path.rs @@ -168,6 +168,7 @@ impl Paths { path: &PathRef, local_cid: Option, remote_cid: RemoteConnectionIdEntry, + now: Instant, ) { debug_assert!(self.is_temporary(path)); @@ -195,7 +196,7 @@ impl Paths { path.borrow_mut().make_permanent(local_cid, remote_cid); self.paths.push(Rc::clone(path)); if self.primary.is_none() { - assert!(self.select_primary(path).is_none()); + assert!(self.select_primary(path, now).is_none()); } } @@ -203,10 +204,10 @@ impl Paths { /// Using the old path is only necessary if this change in path is a reaction /// to a migration from a peer, in which case the old path needs to be probed. #[must_use] - fn select_primary(&mut self, path: &PathRef) -> Option { + fn select_primary(&mut self, path: &PathRef, now: Instant) -> Option { qdebug!([path.borrow()], "set as primary path"); let old_path = self.primary.replace(Rc::clone(path)).inspect(|old| { - old.borrow_mut().set_primary(false); + old.borrow_mut().set_primary(false, now); }); // Swap the primary path into slot 0, so that it is protected from eviction. @@ -218,7 +219,7 @@ impl Paths { .expect("migration target should be permanent"); self.paths.swap(0, idx); - path.borrow_mut().set_primary(true); + path.borrow_mut().set_primary(true, now); old_path } @@ -242,7 +243,7 @@ impl Paths { path.borrow_mut().set_ecn_baseline(baseline); if force || path.borrow().is_valid() { path.borrow_mut().set_valid(now); - mem::drop(self.select_primary(path)); + mem::drop(self.select_primary(path, now)); self.migration_target = None; } else { self.migration_target = Some(Rc::clone(path)); @@ -285,7 +286,7 @@ impl Paths { // Need a clone as `fallback` is borrowed from `self`. let path = Rc::clone(fallback); qinfo!([path.borrow()], "Failing over after primary path failed"); - mem::drop(self.select_primary(&path)); + mem::drop(self.select_primary(&path, now)); true } else { false @@ -328,7 +329,7 @@ impl Paths { return; } - if let Some(old_path) = self.select_primary(path) { + if let Some(old_path) = self.select_primary(path, now) { // Need to probe the old path if the peer migrates. old_path.borrow_mut().probe(stats); // TODO(mt) - suppress probing if the path was valid within 3PTO. @@ -366,7 +367,7 @@ impl Paths { .map_or(false, |target| Rc::ptr_eq(target, p)) { let primary = self.migration_target.take(); - mem::drop(self.select_primary(&primary.unwrap())); + mem::drop(self.select_primary(&primary.unwrap(), now)); return true; } break; @@ -637,12 +638,12 @@ impl Path { } /// Set whether this path is primary. - pub(crate) fn set_primary(&mut self, primary: bool) { + pub(crate) fn set_primary(&mut self, primary: bool, now: Instant) { qtrace!([self], "Make primary {}", primary); debug_assert!(self.remote_cid.is_some()); self.primary = primary; if !primary { - self.sender.discard_in_flight(); + self.sender.discard_in_flight(now); } } @@ -958,11 +959,11 @@ impl Path { } /// Record a packet as having been sent on this path. - pub fn packet_sent(&mut self, sent: &mut SentPacket) { + pub fn packet_sent(&mut self, sent: &mut SentPacket, now: Instant) { if !self.is_primary() { sent.clear_primary_path(); } - self.sender.on_packet_sent(sent, self.rtt.estimate()); + self.sender.on_packet_sent(sent, self.rtt.estimate(), now); } /// Discard a packet that previously might have been in-flight. @@ -988,7 +989,7 @@ impl Path { ); } - self.sender.discard(sent); + self.sender.discard(sent, now); } /// Record packets as acknowledged with the sender. @@ -1005,7 +1006,7 @@ impl Path { if ecn_ce_received { let cwnd_reduced = self .sender - .on_ecn_ce_received(acked_pkts.first().expect("must be there")); + .on_ecn_ce_received(acked_pkts.first().expect("must be there"), now); if cwnd_reduced { self.rtt.update_ack_delay(self.sender.cwnd(), self.plpmtu()); } diff --git a/neqo-transport/src/qlog.rs b/neqo-transport/src/qlog.rs index fa127212f0..07a7db1846 100644 --- a/neqo-transport/src/qlog.rs +++ b/neqo-transport/src/qlog.rs @@ -8,7 +8,7 @@ use std::{ ops::{Deref, RangeInclusive}, - time::Duration, + time::{Duration, Instant}, }; use neqo_common::{hex, qinfo, qlog::NeqoQlog, Decoder, IpTosEcn}; @@ -33,8 +33,8 @@ use crate::{ version::{Version, VersionConfig, WireVersion}, }; -pub fn connection_tparams_set(qlog: &NeqoQlog, tph: &TransportParametersHandler) { - qlog.add_event_data(|| { +pub fn connection_tparams_set(qlog: &NeqoQlog, tph: &TransportParametersHandler, now: Instant) { + qlog.add_event_data_with_instant(|| { let remote = tph.remote(); #[allow(clippy::cast_possible_truncation)] // Nope. let ev_data = EventData::TransportParametersSet( @@ -79,72 +79,86 @@ pub fn connection_tparams_set(qlog: &NeqoQlog, tph: &TransportParametersHandler) }); Some(ev_data) - }); + }, now); } -pub fn server_connection_started(qlog: &NeqoQlog, path: &PathRef) { - connection_started(qlog, path); +pub fn server_connection_started(qlog: &NeqoQlog, path: &PathRef, now: Instant) { + connection_started(qlog, path, now); } -pub fn client_connection_started(qlog: &NeqoQlog, path: &PathRef) { - connection_started(qlog, path); +pub fn client_connection_started(qlog: &NeqoQlog, path: &PathRef, now: Instant) { + connection_started(qlog, path, now); } -fn connection_started(qlog: &NeqoQlog, path: &PathRef) { - qlog.add_event_data(|| { - let p = path.deref().borrow(); - let ev_data = EventData::ConnectionStarted(ConnectionStarted { - ip_version: if p.local_address().ip().is_ipv4() { - Some("ipv4".into()) - } else { - Some("ipv6".into()) - }, - src_ip: format!("{}", p.local_address().ip()), - dst_ip: format!("{}", p.remote_address().ip()), - protocol: Some("QUIC".into()), - src_port: p.local_address().port().into(), - dst_port: p.remote_address().port().into(), - src_cid: p.local_cid().map(ToString::to_string), - dst_cid: p.remote_cid().map(ToString::to_string), - }); +fn connection_started(qlog: &NeqoQlog, path: &PathRef, now: Instant) { + qlog.add_event_data_with_instant( + || { + let p = path.deref().borrow(); + let ev_data = EventData::ConnectionStarted(ConnectionStarted { + ip_version: if p.local_address().ip().is_ipv4() { + Some("ipv4".into()) + } else { + Some("ipv6".into()) + }, + src_ip: format!("{}", p.local_address().ip()), + dst_ip: format!("{}", p.remote_address().ip()), + protocol: Some("QUIC".into()), + src_port: p.local_address().port().into(), + dst_port: p.remote_address().port().into(), + src_cid: p.local_cid().map(ToString::to_string), + dst_cid: p.remote_cid().map(ToString::to_string), + }); - Some(ev_data) - }); + Some(ev_data) + }, + now, + ); } -pub fn connection_state_updated(qlog: &NeqoQlog, new: &State) { - qlog.add_event_data(|| { - let ev_data = EventData::ConnectionStateUpdated(ConnectionStateUpdated { - old: None, - new: match new { - State::Init | State::WaitInitial => ConnectionState::Attempted, - State::WaitVersion | State::Handshaking => ConnectionState::HandshakeStarted, - State::Connected => ConnectionState::HandshakeCompleted, - State::Confirmed => ConnectionState::HandshakeConfirmed, - State::Closing { .. } => ConnectionState::Closing, - State::Draining { .. } => ConnectionState::Draining, - State::Closed { .. } => ConnectionState::Closed, - }, - }); +#[allow(clippy::similar_names)] +pub fn connection_state_updated(qlog: &NeqoQlog, new: &State, now: Instant) { + qlog.add_event_data_with_instant( + || { + let ev_data = EventData::ConnectionStateUpdated(ConnectionStateUpdated { + old: None, + new: match new { + State::Init | State::WaitInitial => ConnectionState::Attempted, + State::WaitVersion | State::Handshaking => ConnectionState::HandshakeStarted, + State::Connected => ConnectionState::HandshakeCompleted, + State::Confirmed => ConnectionState::HandshakeConfirmed, + State::Closing { .. } => ConnectionState::Closing, + State::Draining { .. } => ConnectionState::Draining, + State::Closed { .. } => ConnectionState::Closed, + }, + }); - Some(ev_data) - }); + Some(ev_data) + }, + now, + ); } -pub fn client_version_information_initiated(qlog: &NeqoQlog, version_config: &VersionConfig) { - qlog.add_event_data(|| { - Some(EventData::VersionInformation(VersionInformation { - client_versions: Some( - version_config - .all() - .iter() - .map(|v| format!("{:02x}", v.wire_version())) - .collect(), - ), - server_versions: None, - chosen_version: Some(format!("{:02x}", version_config.initial().wire_version())), - })) - }); +pub fn client_version_information_initiated( + qlog: &NeqoQlog, + version_config: &VersionConfig, + now: Instant, +) { + qlog.add_event_data_with_instant( + || { + Some(EventData::VersionInformation(VersionInformation { + client_versions: Some( + version_config + .all() + .iter() + .map(|v| format!("{:02x}", v.wire_version())) + .collect(), + ), + server_versions: None, + chosen_version: Some(format!("{:02x}", version_config.initial().wire_version())), + })) + }, + now, + ); } pub fn client_version_information_negotiated( @@ -152,96 +166,119 @@ pub fn client_version_information_negotiated( client: &[Version], server: &[WireVersion], chosen: Version, + now: Instant, ) { - qlog.add_event_data(|| { - Some(EventData::VersionInformation(VersionInformation { - client_versions: Some( - client - .iter() - .map(|v| format!("{:02x}", v.wire_version())) - .collect(), - ), - server_versions: Some(server.iter().map(|v| format!("{v:02x}")).collect()), - chosen_version: Some(format!("{:02x}", chosen.wire_version())), - })) - }); + qlog.add_event_data_with_instant( + || { + Some(EventData::VersionInformation(VersionInformation { + client_versions: Some( + client + .iter() + .map(|v| format!("{:02x}", v.wire_version())) + .collect(), + ), + server_versions: Some(server.iter().map(|v| format!("{v:02x}")).collect()), + chosen_version: Some(format!("{:02x}", chosen.wire_version())), + })) + }, + now, + ); } -pub fn server_version_information_failed(qlog: &NeqoQlog, server: &[Version], client: WireVersion) { - qlog.add_event_data(|| { - Some(EventData::VersionInformation(VersionInformation { - client_versions: Some(vec![format!("{client:02x}")]), - server_versions: Some( - server - .iter() - .map(|v| format!("{:02x}", v.wire_version())) - .collect(), - ), - chosen_version: None, - })) - }); +pub fn server_version_information_failed( + qlog: &NeqoQlog, + server: &[Version], + client: WireVersion, + now: Instant, +) { + qlog.add_event_data_with_instant( + || { + Some(EventData::VersionInformation(VersionInformation { + client_versions: Some(vec![format!("{client:02x}")]), + server_versions: Some( + server + .iter() + .map(|v| format!("{:02x}", v.wire_version())) + .collect(), + ), + chosen_version: None, + })) + }, + now, + ); } -pub fn packet_sent(qlog: &NeqoQlog, pt: PacketType, pn: PacketNumber, plen: usize, body: &[u8]) { - qlog.add_event_with_stream(|stream| { - let mut d = Decoder::from(body); - let header = PacketHeader::with_type(pt.into(), Some(pn), None, None, None); - let raw = RawInfo { - length: Some(plen as u64), - payload_length: None, - data: None, - }; - - let mut frames = SmallVec::new(); - while d.remaining() > 0 { - if let Ok(f) = Frame::decode(&mut d) { - frames.push(QuicFrame::from(f)); - } else { - qinfo!("qlog: invalid frame"); - break; +pub fn packet_sent( + qlog: &NeqoQlog, + pt: PacketType, + pn: PacketNumber, + plen: usize, + body: &[u8], + now: Instant, +) { + qlog.add_event_data_with_instant( + || { + let mut d = Decoder::from(body); + let header = PacketHeader::with_type(pt.into(), Some(pn), None, None, None); + let raw = RawInfo { + length: Some(plen as u64), + payload_length: None, + data: None, + }; + + let mut frames = SmallVec::new(); + while d.remaining() > 0 { + if let Ok(f) = Frame::decode(&mut d) { + frames.push(QuicFrame::from(f)); + } else { + qinfo!("qlog: invalid frame"); + break; + } } - } - let ev_data = EventData::PacketSent(PacketSent { - header, - frames: Some(frames), - is_coalesced: None, - retry_token: None, - stateless_reset_token: None, - supported_versions: None, - raw: Some(raw), - datagram_id: None, - send_at_time: None, - trigger: None, - }); - - stream.add_event_data_now(ev_data) - }); + Some(EventData::PacketSent(PacketSent { + header, + frames: Some(frames), + is_coalesced: None, + retry_token: None, + stateless_reset_token: None, + supported_versions: None, + raw: Some(raw), + datagram_id: None, + send_at_time: None, + trigger: None, + })) + }, + now, + ); } -pub fn packet_dropped(qlog: &NeqoQlog, public_packet: &PublicPacket) { - qlog.add_event_data(|| { - let header = - PacketHeader::with_type(public_packet.packet_type().into(), None, None, None, None); - let raw = RawInfo { - length: Some(public_packet.len() as u64), - payload_length: None, - data: None, - }; - - let ev_data = EventData::PacketDropped(PacketDropped { - header: Some(header), - raw: Some(raw), - datagram_id: None, - details: None, - trigger: None, - }); +pub fn packet_dropped(qlog: &NeqoQlog, public_packet: &PublicPacket, now: Instant) { + qlog.add_event_data_with_instant( + || { + let header = + PacketHeader::with_type(public_packet.packet_type().into(), None, None, None, None); + let raw = RawInfo { + length: Some(public_packet.len() as u64), + payload_length: None, + data: None, + }; - Some(ev_data) - }); + let ev_data = EventData::PacketDropped(PacketDropped { + header: Some(header), + raw: Some(raw), + datagram_id: None, + details: None, + trigger: None, + }); + + Some(ev_data) + }, + now, + ); } -pub fn packets_lost(qlog: &NeqoQlog, pkts: &[SentPacket]) { +pub fn packets_lost(qlog: &NeqoQlog, pkts: &[SentPacket], now: Instant) { qlog.add_event_with_stream(|stream| { for pkt in pkts { let header = @@ -253,54 +290,60 @@ pub fn packets_lost(qlog: &NeqoQlog, pkts: &[SentPacket]) { frames: None, }); - stream.add_event_data_now(ev_data)?; + stream.add_event_data_with_instant(ev_data, now)?; } Ok(()) }); } -pub fn packet_received(qlog: &NeqoQlog, public_packet: &PublicPacket, payload: &DecryptedPacket) { - qlog.add_event_with_stream(|stream| { - let mut d = Decoder::from(&payload[..]); - - let header = PacketHeader::with_type( - public_packet.packet_type().into(), - Some(payload.pn()), - None, - None, - None, - ); - let raw = RawInfo { - length: Some(public_packet.len() as u64), - payload_length: None, - data: None, - }; - - let mut frames = Vec::new(); - - while d.remaining() > 0 { - if let Ok(f) = Frame::decode(&mut d) { - frames.push(QuicFrame::from(f)); - } else { - qinfo!("qlog: invalid frame"); - break; +pub fn packet_received( + qlog: &NeqoQlog, + public_packet: &PublicPacket, + payload: &DecryptedPacket, + now: Instant, +) { + qlog.add_event_data_with_instant( + || { + let mut d = Decoder::from(&payload[..]); + + let header = PacketHeader::with_type( + public_packet.packet_type().into(), + Some(payload.pn()), + None, + None, + None, + ); + let raw = RawInfo { + length: Some(public_packet.len() as u64), + payload_length: None, + data: None, + }; + + let mut frames = Vec::new(); + + while d.remaining() > 0 { + if let Ok(f) = Frame::decode(&mut d) { + frames.push(QuicFrame::from(f)); + } else { + qinfo!("qlog: invalid frame"); + break; + } } - } - let ev_data = EventData::PacketReceived(PacketReceived { - header, - frames: Some(frames), - is_coalesced: None, - retry_token: None, - stateless_reset_token: None, - supported_versions: None, - raw: Some(raw), - datagram_id: None, - trigger: None, - }); - - stream.add_event_data_now(ev_data) - }); + Some(EventData::PacketReceived(PacketReceived { + header, + frames: Some(frames), + is_coalesced: None, + retry_token: None, + stateless_reset_token: None, + supported_versions: None, + raw: Some(raw), + datagram_id: None, + trigger: None, + })) + }, + now, + ); } #[allow(dead_code)] @@ -319,55 +362,60 @@ pub enum QlogMetric { PacingRate(u64), } -pub fn metrics_updated(qlog: &NeqoQlog, updated_metrics: &[QlogMetric]) { +pub fn metrics_updated(qlog: &NeqoQlog, updated_metrics: &[QlogMetric], now: Instant) { debug_assert!(!updated_metrics.is_empty()); - qlog.add_event_data(|| { - let mut min_rtt: Option = None; - let mut smoothed_rtt: Option = None; - let mut latest_rtt: Option = None; - let mut rtt_variance: Option = None; - let mut pto_count: Option = None; - let mut congestion_window: Option = None; - let mut bytes_in_flight: Option = None; - let mut ssthresh: Option = None; - let mut packets_in_flight: Option = None; - let mut pacing_rate: Option = None; - - for metric in updated_metrics { - #[allow(clippy::cast_precision_loss)] // Nought to do here. - match metric { - QlogMetric::MinRtt(v) => min_rtt = Some(v.as_secs_f32() * 1000.0), - QlogMetric::SmoothedRtt(v) => smoothed_rtt = Some(v.as_secs_f32() * 1000.0), - QlogMetric::LatestRtt(v) => latest_rtt = Some(v.as_secs_f32() * 1000.0), - QlogMetric::RttVariance(v) => rtt_variance = Some(v.as_secs_f32() * 1000.0), - QlogMetric::PtoCount(v) => pto_count = Some(u16::try_from(*v).unwrap()), - QlogMetric::CongestionWindow(v) => { - congestion_window = Some(u64::try_from(*v).unwrap()); + qlog.add_event_data_with_instant( + || { + let mut min_rtt: Option = None; + let mut smoothed_rtt: Option = None; + let mut latest_rtt: Option = None; + let mut rtt_variance: Option = None; + let mut pto_count: Option = None; + let mut congestion_window: Option = None; + let mut bytes_in_flight: Option = None; + let mut ssthresh: Option = None; + let mut packets_in_flight: Option = None; + let mut pacing_rate: Option = None; + + for metric in updated_metrics { + #[allow(clippy::cast_precision_loss)] // Nought to do here. + match metric { + QlogMetric::MinRtt(v) => min_rtt = Some(v.as_secs_f32() * 1000.0), + QlogMetric::SmoothedRtt(v) => smoothed_rtt = Some(v.as_secs_f32() * 1000.0), + QlogMetric::LatestRtt(v) => latest_rtt = Some(v.as_secs_f32() * 1000.0), + QlogMetric::RttVariance(v) => rtt_variance = Some(v.as_secs_f32() * 1000.0), + QlogMetric::PtoCount(v) => pto_count = Some(u16::try_from(*v).unwrap()), + QlogMetric::CongestionWindow(v) => { + congestion_window = Some(u64::try_from(*v).unwrap()); + } + QlogMetric::BytesInFlight(v) => { + bytes_in_flight = Some(u64::try_from(*v).unwrap()); + } + QlogMetric::SsThresh(v) => ssthresh = Some(u64::try_from(*v).unwrap()), + QlogMetric::PacketsInFlight(v) => packets_in_flight = Some(*v), + QlogMetric::PacingRate(v) => pacing_rate = Some(*v), + _ => (), } - QlogMetric::BytesInFlight(v) => bytes_in_flight = Some(u64::try_from(*v).unwrap()), - QlogMetric::SsThresh(v) => ssthresh = Some(u64::try_from(*v).unwrap()), - QlogMetric::PacketsInFlight(v) => packets_in_flight = Some(*v), - QlogMetric::PacingRate(v) => pacing_rate = Some(*v), - _ => (), } - } - let ev_data = EventData::MetricsUpdated(MetricsUpdated { - min_rtt, - smoothed_rtt, - latest_rtt, - rtt_variance, - pto_count, - congestion_window, - bytes_in_flight, - ssthresh, - packets_in_flight, - pacing_rate, - }); + let ev_data = EventData::MetricsUpdated(MetricsUpdated { + min_rtt, + smoothed_rtt, + latest_rtt, + rtt_variance, + pto_count, + congestion_window, + bytes_in_flight, + ssthresh, + packets_in_flight, + pacing_rate, + }); - Some(ev_data) - }); + Some(ev_data) + }, + now, + ); } // Helper functions diff --git a/neqo-transport/src/recovery/mod.rs b/neqo-transport/src/recovery/mod.rs index 0a4fb20096..63d689bfb5 100644 --- a/neqo-transport/src/recovery/mod.rs +++ b/neqo-transport/src/recovery/mod.rs @@ -527,11 +527,11 @@ impl LossRecovery { dropped } - pub fn on_packet_sent(&mut self, path: &PathRef, mut sent_packet: SentPacket) { + pub fn on_packet_sent(&mut self, path: &PathRef, mut sent_packet: SentPacket, now: Instant) { let pn_space = PacketNumberSpace::from(sent_packet.packet_type()); qtrace!([self], "packet {}-{} sent", pn_space, sent_packet.pn()); if let Some(space) = self.spaces.get_mut(pn_space) { - path.borrow_mut().packet_sent(&mut sent_packet); + path.borrow_mut().packet_sent(&mut sent_packet, now); space.on_packet_sent(sent_packet); } else { qwarn!( @@ -690,7 +690,7 @@ impl LossRecovery { if let Some(pto) = self.pto_time(rtt, PacketNumberSpace::ApplicationData) { if pto < now { let probes = PacketNumberSpaceSet::from(&[PacketNumberSpace::ApplicationData]); - self.fire_pto(PacketNumberSpace::ApplicationData, probes); + self.fire_pto(PacketNumberSpace::ApplicationData, probes, now); } } } @@ -804,7 +804,12 @@ impl LossRecovery { } } - fn fire_pto(&mut self, pn_space: PacketNumberSpace, allow_probes: PacketNumberSpaceSet) { + fn fire_pto( + &mut self, + pn_space: PacketNumberSpace, + allow_probes: PacketNumberSpaceSet, + now: Instant, + ) { if let Some(st) = &mut self.pto_state { st.pto(pn_space, allow_probes); } else { @@ -821,6 +826,7 @@ impl LossRecovery { &[QlogMetric::PtoCount( self.pto_state.as_ref().unwrap().count(), )], + now, ); } @@ -853,7 +859,7 @@ impl LossRecovery { // pto_time to increase which might cause PTO for later packet number spaces to not fire. if let Some(pn_space) = pto_space { qtrace!([self], "PTO {}, probing {:?}", pn_space, allow_probes); - self.fire_pto(pn_space, allow_probes); + self.fire_pto(pn_space, allow_probes, now); } } @@ -986,8 +992,8 @@ mod tests { .on_ack_received(&self.path, pn_space, acked_ranges, ack_ecn, ack_delay, now) } - pub fn on_packet_sent(&mut self, sent_packet: SentPacket) { - self.lr.on_packet_sent(&self.path, sent_packet); + pub fn on_packet_sent(&mut self, sent_packet: SentPacket, now: Instant) { + self.lr.on_packet_sent(&self.path, sent_packet, now); } pub fn timeout(&mut self, now: Instant) -> Vec { @@ -1026,7 +1032,7 @@ mod tests { None, ConnectionIdEntry::new(0, ConnectionId::from(&[1, 2, 3]), [0; 16]), ); - path.set_primary(true); + path.set_primary(true, now()); path.rtt_mut().set_initial(TEST_RTT); Self { lr: LossRecovery::new(StatsCell::default(), FAST_PTO_SCALE), @@ -1119,15 +1125,18 @@ mod tests { fn pace(lr: &mut Fixture, count: u64) { for pn in 0..count { - lr.on_packet_sent(SentPacket::new( - PacketType::Short, - pn, - IpTosEcn::default(), - pn_time(pn), - true, - Vec::new(), - ON_SENT_SIZE, - )); + lr.on_packet_sent( + SentPacket::new( + PacketType::Short, + pn, + IpTosEcn::default(), + pn_time(pn), + true, + Vec::new(), + ON_SENT_SIZE, + ), + Instant::now(), + ); } } @@ -1269,24 +1278,30 @@ mod tests { // So send two packets with 1/4 RTT between them. Acknowledge pn 1 after 1 RTT. // pn 0 should then be marked lost because it is then outstanding for 5RTT/4 // the loss time for packets is 9RTT/8. - lr.on_packet_sent(SentPacket::new( - PacketType::Short, - 0, - IpTosEcn::default(), - pn_time(0), - true, - Vec::new(), - ON_SENT_SIZE, - )); - lr.on_packet_sent(SentPacket::new( - PacketType::Short, - 1, - IpTosEcn::default(), - pn_time(0) + TEST_RTT / 4, - true, - Vec::new(), - ON_SENT_SIZE, - )); + lr.on_packet_sent( + SentPacket::new( + PacketType::Short, + 0, + IpTosEcn::default(), + pn_time(0), + true, + Vec::new(), + ON_SENT_SIZE, + ), + Instant::now(), + ); + lr.on_packet_sent( + SentPacket::new( + PacketType::Short, + 1, + IpTosEcn::default(), + pn_time(0) + TEST_RTT / 4, + true, + Vec::new(), + ON_SENT_SIZE, + ), + Instant::now(), + ); let (_, lost) = lr.on_ack_received( PacketNumberSpace::ApplicationData, vec![1..=1], @@ -1374,33 +1389,42 @@ mod tests { #[test] fn drop_spaces() { let mut lr = Fixture::default(); - lr.on_packet_sent(SentPacket::new( - PacketType::Initial, - 0, - IpTosEcn::default(), - pn_time(0), - true, - Vec::new(), - ON_SENT_SIZE, - )); - lr.on_packet_sent(SentPacket::new( - PacketType::Handshake, - 0, - IpTosEcn::default(), - pn_time(1), - true, - Vec::new(), - ON_SENT_SIZE, - )); - lr.on_packet_sent(SentPacket::new( - PacketType::Short, - 0, - IpTosEcn::default(), - pn_time(2), - true, - Vec::new(), - ON_SENT_SIZE, - )); + lr.on_packet_sent( + SentPacket::new( + PacketType::Initial, + 0, + IpTosEcn::default(), + pn_time(0), + true, + Vec::new(), + ON_SENT_SIZE, + ), + Instant::now(), + ); + lr.on_packet_sent( + SentPacket::new( + PacketType::Handshake, + 0, + IpTosEcn::default(), + pn_time(1), + true, + Vec::new(), + ON_SENT_SIZE, + ), + Instant::now(), + ); + lr.on_packet_sent( + SentPacket::new( + PacketType::Short, + 0, + IpTosEcn::default(), + pn_time(2), + true, + Vec::new(), + ON_SENT_SIZE, + ), + Instant::now(), + ); // Now put all spaces on the LR timer so we can see them. for sp in &[ @@ -1418,7 +1442,7 @@ mod tests { ON_SENT_SIZE, ); let pn_space = PacketNumberSpace::from(sent_pkt.packet_type()); - lr.on_packet_sent(sent_pkt); + lr.on_packet_sent(sent_pkt, Instant::now()); lr.on_ack_received( pn_space, vec![1..=1], @@ -1444,30 +1468,36 @@ mod tests { // There are cases where we send a packet that is not subsequently tracked. // So check that this works. - lr.on_packet_sent(SentPacket::new( - PacketType::Initial, - 0, - IpTosEcn::default(), - pn_time(3), - true, - Vec::new(), - ON_SENT_SIZE, - )); + lr.on_packet_sent( + SentPacket::new( + PacketType::Initial, + 0, + IpTosEcn::default(), + pn_time(3), + true, + Vec::new(), + ON_SENT_SIZE, + ), + Instant::now(), + ); assert_sent_times(&lr, None, None, Some(pn_time(2))); } #[test] fn rearm_pto_after_confirmed() { let mut lr = Fixture::default(); - lr.on_packet_sent(SentPacket::new( - PacketType::Initial, - 0, - IpTosEcn::default(), - now(), - true, - Vec::new(), - ON_SENT_SIZE, - )); + lr.on_packet_sent( + SentPacket::new( + PacketType::Initial, + 0, + IpTosEcn::default(), + now(), + true, + Vec::new(), + ON_SENT_SIZE, + ), + Instant::now(), + ); // Set the RTT to the initial value so that discarding doesn't // alter the estimate. let rtt = lr.path.borrow().rtt().estimate(); @@ -1479,24 +1509,30 @@ mod tests { now() + rtt, ); - lr.on_packet_sent(SentPacket::new( - PacketType::Handshake, - 0, - IpTosEcn::default(), - now(), - true, - Vec::new(), - ON_SENT_SIZE, - )); - lr.on_packet_sent(SentPacket::new( - PacketType::Short, - 0, - IpTosEcn::default(), - now(), - true, - Vec::new(), - ON_SENT_SIZE, - )); + lr.on_packet_sent( + SentPacket::new( + PacketType::Handshake, + 0, + IpTosEcn::default(), + now(), + true, + Vec::new(), + ON_SENT_SIZE, + ), + Instant::now(), + ); + lr.on_packet_sent( + SentPacket::new( + PacketType::Short, + 0, + IpTosEcn::default(), + now(), + true, + Vec::new(), + ON_SENT_SIZE, + ), + Instant::now(), + ); assert!(lr.pto_time(PacketNumberSpace::ApplicationData).is_some()); lr.discard(PacketNumberSpace::Initial, pn_time(1)); @@ -1526,15 +1562,18 @@ mod tests { assert_eq!(path.amplification_limit(), SPARE); } - lr.on_packet_sent(SentPacket::new( - PacketType::Initial, - 0, - IpTosEcn::default(), - now(), - true, - Vec::new(), - ON_SENT_SIZE, - )); + lr.on_packet_sent( + SentPacket::new( + PacketType::Initial, + 0, + IpTosEcn::default(), + now(), + true, + Vec::new(), + ON_SENT_SIZE, + ), + Instant::now(), + ); let handshake_pto = lr.path.borrow().rtt().pto(false); let expected_pto = now() + handshake_pto; diff --git a/neqo-transport/src/rtt.rs b/neqo-transport/src/rtt.rs index ba623a1a1c..c18bbd62ec 100644 --- a/neqo-transport/src/rtt.rs +++ b/neqo-transport/src/rtt.rs @@ -154,6 +154,7 @@ impl RttEstimate { QlogMetric::SmoothedRtt(self.smoothed_rtt), QlogMetric::RttVariance(self.rttvar), ], + now, ); } diff --git a/neqo-transport/src/sender.rs b/neqo-transport/src/sender.rs index a9ead627aa..e6075ee327 100644 --- a/neqo-transport/src/sender.rs +++ b/neqo-transport/src/sender.rs @@ -128,6 +128,7 @@ impl PacketSender { prev_largest_acked_sent, pto, lost_packets, + now, ); // Call below may change the size of MTU probes, so it needs to happen after the CC // reaction above, which needs to ignore probes based on their size. @@ -137,24 +138,24 @@ impl PacketSender { } /// Called when ECN CE mark received. Returns true if the congestion window was reduced. - pub fn on_ecn_ce_received(&mut self, largest_acked_pkt: &SentPacket) -> bool { - self.cc.on_ecn_ce_received(largest_acked_pkt) + pub fn on_ecn_ce_received(&mut self, largest_acked_pkt: &SentPacket, now: Instant) -> bool { + self.cc.on_ecn_ce_received(largest_acked_pkt, now) } - pub fn discard(&mut self, pkt: &SentPacket) { - self.cc.discard(pkt); + pub fn discard(&mut self, pkt: &SentPacket, now: Instant) { + self.cc.discard(pkt, now); } /// When we migrate, the congestion controller for the previously active path drops /// all bytes in flight. - pub fn discard_in_flight(&mut self) { - self.cc.discard_in_flight(); + pub fn discard_in_flight(&mut self, now: Instant) { + self.cc.discard_in_flight(now); } - pub fn on_packet_sent(&mut self, pkt: &SentPacket, rtt: Duration) { + pub fn on_packet_sent(&mut self, pkt: &SentPacket, rtt: Duration, now: Instant) { self.pacer .spend(pkt.time_sent(), rtt, self.cc.cwnd(), pkt.len()); - self.cc.on_packet_sent(pkt); + self.cc.on_packet_sent(pkt, now); } #[must_use] diff --git a/neqo-transport/src/server.rs b/neqo-transport/src/server.rs index 66bec3d77e..7a4472ae1a 100644 --- a/neqo-transport/src/server.rs +++ b/neqo-transport/src/server.rs @@ -332,6 +332,7 @@ impl Server { &self.create_qlog_trace(orig_dcid.unwrap_or(initial.dst_cid).as_cid_ref()), self.conn_params.get_versions().all(), initial.version.wire_version(), + now, ); } Output::None @@ -390,6 +391,7 @@ impl Server { &self.create_qlog_trace(packet.dcid()), self.conn_params.get_versions().all(), packet.wire_version(), + now, ); return Output::Datagram(Datagram::new(