From cac7e45097bd3dcc94d7028f249405e1b4efb90e Mon Sep 17 00:00:00 2001 From: Dario Nieuwenhuis Date: Mon, 16 Sep 2024 19:43:34 +0200 Subject: [PATCH] iface: make poll() process all packets, add fine-grained poll functions. This makes `.poll()` behave the same as before #954. Users affected by DoS concerns can use the finer-grained egress-only and single-packet-ingress-only fns. --- src/iface/interface/ipv4.rs | 9 +- src/iface/interface/mod.rs | 209 +++++++++++++++++++++++-------- src/iface/interface/multicast.rs | 5 +- src/iface/interface/sixlowpan.rs | 13 +- 4 files changed, 162 insertions(+), 74 deletions(-) diff --git a/src/iface/interface/ipv4.rs b/src/iface/interface/ipv4.rs index ef5766585..2e93a336e 100644 --- a/src/iface/interface/ipv4.rs +++ b/src/iface/interface/ipv4.rs @@ -7,17 +7,14 @@ impl Interface { /// processed or emitted, and thus, whether the readiness of any socket might /// have changed. #[cfg(feature = "proto-ipv4-fragmentation")] - pub(super) fn ipv4_egress(&mut self, device: &mut D) -> bool - where - D: Device + ?Sized, - { + pub(super) fn ipv4_egress(&mut self, device: &mut (impl Device + ?Sized)) { // Reset the buffer when we transmitted everything. if self.fragmenter.finished() { self.fragmenter.reset(); } if self.fragmenter.is_empty() { - return false; + return; } let pkt = &self.fragmenter; @@ -25,10 +22,8 @@ impl Interface { if let Some(tx_token) = device.transmit(self.inner.now) { self.inner .dispatch_ipv4_frag(tx_token, &mut self.fragmenter); - return true; } } - false } } diff --git a/src/iface/interface/mod.rs b/src/iface/interface/mod.rs index c850d3c8f..dda78bd3d 100644 --- a/src/iface/interface/mod.rs +++ b/src/iface/interface/mod.rs @@ -26,6 +26,7 @@ mod udp; use super::packet::*; +use core::ops::{BitOr, BitOrAssign}; use core::result::Result; use heapless::Vec; @@ -65,6 +66,60 @@ macro_rules! check { } use check; +/// Result returned by [`Interface::poll`]. +/// +/// This contains information on whether socket states might have changed. +#[derive(Copy, Clone, PartialEq, Eq, Debug)] +#[cfg_attr(feature = "defmt", derive(defmt::Format))] +pub enum PollResult { + /// Socket state is guaranteed to not have changed. + None, + /// You should check the state of sockets again for received data or completion of operations. + SocketStateChanged, +} + +impl BitOr for PollResult { + type Output = Self; + fn bitor(self, rhs: Self) -> Self::Output { + match (self, rhs) { + (PollResult::None, PollResult::None) => PollResult::None, + _ => PollResult::SocketStateChanged, + } + } +} + +impl BitOrAssign for PollResult { + fn bitor_assign(&mut self, rhs: Self) { + *self = *self | rhs; + } +} + +/// Result returned by [`Interface::poll_ingress_single`]. +/// +/// This contains information on whether a packet was processed or not, +/// and whether it might've affected socket states. +#[derive(Copy, Clone, PartialEq, Eq, Debug)] +#[cfg_attr(feature = "defmt", derive(defmt::Format))] +pub enum PollIngressSingleResult { + /// No packet was processed. You don't need to call [`Interface::poll_ingress_single`] + /// again, until more packets arrive. + /// + /// Socket state is guaranteed to not have changed. + None, + /// A packet was processed. + /// + /// There may be more packets in the device's RX queue, so you should call [`Interface::poll_ingress_single`] again. + /// + /// Socket state is guaranteed to not have changed. + PacketProcessed, + /// A packet was processed, which might have caused socket state to change. + /// + /// There may be more packets in the device's RX queue, so you should call [`Interface::poll_ingress_single`] again. + /// + /// You should check the state of sockets again for received data or completion of operations. + SocketStateChanged, +} + /// A network interface. /// /// The network interface logically owns a number of other data structures; to avoid @@ -150,10 +205,7 @@ impl Interface { /// # Panics /// This function panics if the [`Config::hardware_address`] does not match /// the medium of the device. - pub fn new(config: Config, device: &mut D, now: Instant) -> Self - where - D: Device + ?Sized, - { + pub fn new(config: Config, device: &mut (impl Device + ?Sized), now: Instant) -> Self { let caps = device.capabilities(); assert_eq!( config.hardware_addr.medium(), @@ -375,59 +427,107 @@ impl Interface { self.fragments.reassembly_timeout = timeout; } - /// Transmit packets queued in the given sockets, and receive packets queued + /// Transmit packets queued in the sockets, and receive packets queued /// in the device. /// - /// This function returns a boolean value indicating whether any packets were - /// processed or emitted, and thus, whether the readiness of any socket might - /// have changed. + /// This function returns a value indicating whether the state of any socket + /// might have changed. /// - /// # Note - /// This function performs a bounded amount of work per call to avoid - /// starving other tasks of CPU time. If it returns true, there may still be - /// packets to be received or transmitted. Depending on system design, - /// calling this function in a loop may cause a denial of service if - /// packets cannot be processed faster than they arrive. - pub fn poll( + /// ## DoS warning + /// + /// This function processes all packets in the device's queue. This can + /// be an unbounded amount of work if packets arrive faster than they're + /// processed. + /// + /// If this is a concern for your application (i.e. your environment doesn't + /// have preemptive scheduling, or `poll()` is called from a main loop where + /// other important things are processed), you may use the lower-level methods + /// [`poll_egress()`](Self::poll_egress) and [`poll_ingress_single()`](Self::poll_ingress_single). + /// This allows you to insert yields or process other events between processing + /// individual ingress packets. + pub fn poll( &mut self, timestamp: Instant, - device: &mut D, + device: &mut (impl Device + ?Sized), sockets: &mut SocketSet<'_>, - ) -> bool - where - D: Device + ?Sized, - { + ) -> PollResult { self.inner.now = timestamp; + let mut res = PollResult::None; + #[cfg(feature = "_proto-fragmentation")] self.fragments.assembler.remove_expired(timestamp); + // Process ingress while there's packets available. + loop { + match self.socket_ingress(device, sockets) { + PollIngressSingleResult::None => break, + PollIngressSingleResult::PacketProcessed => {} + PollIngressSingleResult::SocketStateChanged => res = PollResult::SocketStateChanged, + } + } + + // Process egress. + match self.poll_egress(timestamp, device, sockets) { + PollResult::None => {} + PollResult::SocketStateChanged => res = PollResult::SocketStateChanged, + } + + res + } + + /// Transmit packets queued in the sockets. + /// + /// This function returns a value indicating whether the state of any socket + /// might have changed. + /// + /// This is guaranteed to always perform a bounded amount of work. + pub fn poll_egress( + &mut self, + timestamp: Instant, + device: &mut (impl Device + ?Sized), + sockets: &mut SocketSet<'_>, + ) -> PollResult { + self.inner.now = timestamp; + match self.inner.caps.medium { #[cfg(feature = "medium-ieee802154")] - Medium::Ieee802154 => - { + Medium::Ieee802154 => { #[cfg(feature = "proto-sixlowpan-fragmentation")] - if self.sixlowpan_egress(device) { - return true; - } + self.sixlowpan_egress(device); } #[cfg(any(feature = "medium-ethernet", feature = "medium-ip"))] - _ => - { + _ => { #[cfg(feature = "proto-ipv4-fragmentation")] - if self.ipv4_egress(device) { - return true; - } + self.ipv4_egress(device); } } - let mut readiness_may_have_changed = self.socket_ingress(device, sockets); - readiness_may_have_changed |= self.socket_egress(device, sockets); - #[cfg(feature = "multicast")] self.multicast_egress(device); - readiness_may_have_changed + self.socket_egress(device, sockets) + } + + /// Process one incoming packet queued in the device. + /// + /// Returns a value indicating: + /// - whether a packet was processed, in which case you have to call this method again in case there's more packets queued. + /// - whether the state of any socket might have changed. + /// + /// Since it processes at most one packet, this is guaranteed to always perform a bounded amount of work. + pub fn poll_ingress_single( + &mut self, + timestamp: Instant, + device: &mut (impl Device + ?Sized), + sockets: &mut SocketSet<'_>, + ) -> PollIngressSingleResult { + self.inner.now = timestamp; + + #[cfg(feature = "_proto-fragmentation")] + self.fragments.assembler.remove_expired(timestamp); + + self.socket_ingress(device, sockets) } /// Return a _soft deadline_ for calling [poll] the next time. @@ -480,20 +580,19 @@ impl Interface { } } - fn socket_ingress(&mut self, device: &mut D, sockets: &mut SocketSet<'_>) -> bool - where - D: Device + ?Sized, - { - let mut processed_any = false; - + fn socket_ingress( + &mut self, + device: &mut (impl Device + ?Sized), + sockets: &mut SocketSet<'_>, + ) -> PollIngressSingleResult { let Some((rx_token, tx_token)) = device.receive(self.inner.now) else { - return processed_any; + return PollIngressSingleResult::None; }; let rx_meta = rx_token.meta(); rx_token.consume(|frame| { if frame.is_empty() { - return; + return PollIngressSingleResult::PacketProcessed; } match self.inner.caps.medium { @@ -543,16 +642,22 @@ impl Interface { } } } - processed_any = true; - }); - processed_any + // TODO: Propagate the PollIngressSingleResult from deeper. + // There's many received packets that we process but can't cause sockets + // to change state. For example IP fragments, multicast stuff, ICMP pings + // if they dont't match any raw socket... + // We should return `PacketProcessed` for these to save the user from + // doing useless socket polls. + PollIngressSingleResult::SocketStateChanged + }) } - fn socket_egress(&mut self, device: &mut D, sockets: &mut SocketSet<'_>) -> bool - where - D: Device + ?Sized, - { + fn socket_egress( + &mut self, + device: &mut (impl Device + ?Sized), + sockets: &mut SocketSet<'_>, + ) -> PollResult { let _caps = device.capabilities(); enum EgressError { @@ -560,7 +665,7 @@ impl Interface { Dispatch, } - let mut emitted_any = false; + let mut result = PollResult::None; for item in sockets.items_mut() { if !item .meta @@ -581,7 +686,7 @@ impl Interface { .dispatch_ip(t, meta, response, &mut self.fragmenter) .map_err(|_| EgressError::Dispatch)?; - emitted_any = true; + result = PollResult::SocketStateChanged; Ok(()) }; @@ -663,7 +768,7 @@ impl Interface { Ok(()) => {} } } - emitted_any + result } } diff --git a/src/iface/interface/multicast.rs b/src/iface/interface/multicast.rs index d66ba4d17..7542a12f4 100644 --- a/src/iface/interface/multicast.rs +++ b/src/iface/interface/multicast.rs @@ -145,10 +145,7 @@ impl Interface { /// - Send join/leave packets according to the multicast group state. /// - Depending on `igmp_report_state` and the therein contained /// timeouts, send IGMP membership reports. - pub(crate) fn multicast_egress(&mut self, device: &mut D) - where - D: Device + ?Sized, - { + pub(crate) fn multicast_egress(&mut self, device: &mut (impl Device + ?Sized)) { // Process multicast joins. while let Some((&addr, _)) = self .inner diff --git a/src/iface/interface/sixlowpan.rs b/src/iface/interface/sixlowpan.rs index a89934f1f..5cdfa3315 100644 --- a/src/iface/interface/sixlowpan.rs +++ b/src/iface/interface/sixlowpan.rs @@ -7,22 +7,15 @@ pub(crate) const MAX_DECOMPRESSED_LEN: usize = 1500; impl Interface { /// Process fragments that still need to be sent for 6LoWPAN packets. - /// - /// This function returns a boolean value indicating whether any packets were - /// processed or emitted, and thus, whether the readiness of any socket might - /// have changed. #[cfg(feature = "proto-sixlowpan-fragmentation")] - pub(super) fn sixlowpan_egress(&mut self, device: &mut D) -> bool - where - D: Device + ?Sized, - { + pub(super) fn sixlowpan_egress(&mut self, device: &mut (impl Device + ?Sized)) { // Reset the buffer when we transmitted everything. if self.fragmenter.finished() { self.fragmenter.reset(); } if self.fragmenter.is_empty() { - return false; + return; } let pkt = &self.fragmenter; @@ -30,10 +23,8 @@ impl Interface { if let Some(tx_token) = device.transmit(self.inner.now) { self.inner .dispatch_ieee802154_frag(tx_token, &mut self.fragmenter); - return true; } } - false } /// Get the 6LoWPAN address contexts.