diff --git a/src/iface/interface/ipv4.rs b/src/iface/interface/ipv4.rs index ef5766585..2e93a336e 100644 --- a/src/iface/interface/ipv4.rs +++ b/src/iface/interface/ipv4.rs @@ -7,17 +7,14 @@ impl Interface { /// processed or emitted, and thus, whether the readiness of any socket might /// have changed. #[cfg(feature = "proto-ipv4-fragmentation")] - pub(super) fn ipv4_egress(&mut self, device: &mut D) -> bool - where - D: Device + ?Sized, - { + pub(super) fn ipv4_egress(&mut self, device: &mut (impl Device + ?Sized)) { // Reset the buffer when we transmitted everything. if self.fragmenter.finished() { self.fragmenter.reset(); } if self.fragmenter.is_empty() { - return false; + return; } let pkt = &self.fragmenter; @@ -25,10 +22,8 @@ impl Interface { if let Some(tx_token) = device.transmit(self.inner.now) { self.inner .dispatch_ipv4_frag(tx_token, &mut self.fragmenter); - return true; } } - false } } diff --git a/src/iface/interface/mod.rs b/src/iface/interface/mod.rs index c850d3c8f..0a03cb9b5 100644 --- a/src/iface/interface/mod.rs +++ b/src/iface/interface/mod.rs @@ -65,6 +65,44 @@ macro_rules! check { } use check; +/// Result returned by [`Interface::poll`]. +/// +/// This contains information on whether socket states might have changed. +#[derive(Copy, Clone, PartialEq, Eq, Debug)] +#[cfg_attr(feature = "defmt", derive(defmt::Format))] +pub enum PollResult { + /// Socket state is guaranteed to not have changed. + None, + /// You should check the state of sockets again for received data or completion of operations. + SocketStateChanged, +} + +/// Result returned by [`Interface::poll_ingress_single`]. +/// +/// This contains information on whether a packet was processed or not, +/// and whether it might've affected socket states. +#[derive(Copy, Clone, PartialEq, Eq, Debug)] +#[cfg_attr(feature = "defmt", derive(defmt::Format))] +pub enum PollIngressSingleResult { + /// No packet was processed. You don't need to call [`Interface::poll_ingress_single`] + /// again, until more packets arrive. + /// + /// Socket state is guaranteed to not have changed. + None, + /// A packet was processed. + /// + /// There may be more packets in the device's RX queue, so you should call [`Interface::poll_ingress_single`] again. + /// + /// Socket state is guaranteed to not have changed. + PacketProcessed, + /// A packet was processed, which might have caused socket state to change. + /// + /// There may be more packets in the device's RX queue, so you should call [`Interface::poll_ingress_single`] again. + /// + /// You should check the state of sockets again for received data or completion of operations. + SocketStateChanged, +} + /// A network interface. /// /// The network interface logically owns a number of other data structures; to avoid @@ -150,10 +188,7 @@ impl Interface { /// # Panics /// This function panics if the [`Config::hardware_address`] does not match /// the medium of the device. - pub fn new(config: Config, device: &mut D, now: Instant) -> Self - where - D: Device + ?Sized, - { + pub fn new(config: Config, device: &mut (impl Device + ?Sized), now: Instant) -> Self { let caps = device.capabilities(); assert_eq!( config.hardware_addr.medium(), @@ -375,59 +410,107 @@ impl Interface { self.fragments.reassembly_timeout = timeout; } - /// Transmit packets queued in the given sockets, and receive packets queued + /// Transmit packets queued in the sockets, and receive packets queued /// in the device. /// - /// This function returns a boolean value indicating whether any packets were - /// processed or emitted, and thus, whether the readiness of any socket might - /// have changed. + /// This function returns a value indicating whether the state of any socket + /// might have changed. + /// + /// ## DoS warning /// - /// # Note - /// This function performs a bounded amount of work per call to avoid - /// starving other tasks of CPU time. If it returns true, there may still be - /// packets to be received or transmitted. Depending on system design, - /// calling this function in a loop may cause a denial of service if - /// packets cannot be processed faster than they arrive. - pub fn poll( + /// This function processes all packets in the device's queue. This can + /// be an unbounded amount of work if packets arrive faster than they're + /// processed. + /// + /// If this is a concern for your application (i.e. your environment doesn't + /// have preemptive scheduling, or `poll()` is called from a main loop where + /// other important things are processed), you may use the lower-level methods + /// [`poll_egress()`](Self::poll_egress) and [`poll_ingress_single()`](Self::poll_ingress_single). + /// This allows you to insert yields or process other events between processing + /// individual ingress packets. + pub fn poll( &mut self, timestamp: Instant, - device: &mut D, + device: &mut (impl Device + ?Sized), sockets: &mut SocketSet<'_>, - ) -> bool - where - D: Device + ?Sized, - { + ) -> PollResult { self.inner.now = timestamp; + let mut res = PollResult::None; + #[cfg(feature = "_proto-fragmentation")] self.fragments.assembler.remove_expired(timestamp); + // Process ingress while there's packets available. + loop { + match self.socket_ingress(device, sockets) { + PollIngressSingleResult::None => break, + PollIngressSingleResult::PacketProcessed => {} + PollIngressSingleResult::SocketStateChanged => res = PollResult::SocketStateChanged, + } + } + + // Process egress. + match self.poll_egress(timestamp, device, sockets) { + PollResult::None => {} + PollResult::SocketStateChanged => res = PollResult::SocketStateChanged, + } + + res + } + + /// Transmit packets queued in the sockets. + /// + /// This function returns a value indicating whether the state of any socket + /// might have changed. + /// + /// This is guaranteed to always perform a bounded amount of work. + pub fn poll_egress( + &mut self, + timestamp: Instant, + device: &mut (impl Device + ?Sized), + sockets: &mut SocketSet<'_>, + ) -> PollResult { + self.inner.now = timestamp; + match self.inner.caps.medium { #[cfg(feature = "medium-ieee802154")] - Medium::Ieee802154 => - { + Medium::Ieee802154 => { #[cfg(feature = "proto-sixlowpan-fragmentation")] - if self.sixlowpan_egress(device) { - return true; - } + self.sixlowpan_egress(device); } #[cfg(any(feature = "medium-ethernet", feature = "medium-ip"))] - _ => - { + _ => { #[cfg(feature = "proto-ipv4-fragmentation")] - if self.ipv4_egress(device) { - return true; - } + self.ipv4_egress(device); } } - let mut readiness_may_have_changed = self.socket_ingress(device, sockets); - readiness_may_have_changed |= self.socket_egress(device, sockets); - #[cfg(feature = "multicast")] self.multicast_egress(device); - readiness_may_have_changed + self.socket_egress(device, sockets) + } + + /// Process one incoming packet queued in the device. + /// + /// Returns a value indicating: + /// - whether a packet was processed, in which case you have to call this method again in case there's more packets queued. + /// - whether the state of any socket might have changed. + /// + /// Since it processes at most one packet, this is guaranteed to always perform a bounded amount of work. + pub fn poll_ingress_single( + &mut self, + timestamp: Instant, + device: &mut (impl Device + ?Sized), + sockets: &mut SocketSet<'_>, + ) -> PollIngressSingleResult { + self.inner.now = timestamp; + + #[cfg(feature = "_proto-fragmentation")] + self.fragments.assembler.remove_expired(timestamp); + + self.socket_ingress(device, sockets) } /// Return a _soft deadline_ for calling [poll] the next time. @@ -480,20 +563,19 @@ impl Interface { } } - fn socket_ingress(&mut self, device: &mut D, sockets: &mut SocketSet<'_>) -> bool - where - D: Device + ?Sized, - { - let mut processed_any = false; - + fn socket_ingress( + &mut self, + device: &mut (impl Device + ?Sized), + sockets: &mut SocketSet<'_>, + ) -> PollIngressSingleResult { let Some((rx_token, tx_token)) = device.receive(self.inner.now) else { - return processed_any; + return PollIngressSingleResult::None; }; let rx_meta = rx_token.meta(); rx_token.consume(|frame| { if frame.is_empty() { - return; + return PollIngressSingleResult::PacketProcessed; } match self.inner.caps.medium { @@ -543,16 +625,22 @@ impl Interface { } } } - processed_any = true; - }); - processed_any + // TODO: Propagate the PollIngressSingleResult from deeper. + // There's many received packets that we process but can't cause sockets + // to change state. For example IP fragments, multicast stuff, ICMP pings + // if they dont't match any raw socket... + // We should return `PacketProcessed` for these to save the user from + // doing useless socket polls. + PollIngressSingleResult::SocketStateChanged + }) } - fn socket_egress(&mut self, device: &mut D, sockets: &mut SocketSet<'_>) -> bool - where - D: Device + ?Sized, - { + fn socket_egress( + &mut self, + device: &mut (impl Device + ?Sized), + sockets: &mut SocketSet<'_>, + ) -> PollResult { let _caps = device.capabilities(); enum EgressError { @@ -560,7 +648,7 @@ impl Interface { Dispatch, } - let mut emitted_any = false; + let mut result = PollResult::None; for item in sockets.items_mut() { if !item .meta @@ -581,7 +669,7 @@ impl Interface { .dispatch_ip(t, meta, response, &mut self.fragmenter) .map_err(|_| EgressError::Dispatch)?; - emitted_any = true; + result = PollResult::SocketStateChanged; Ok(()) }; @@ -663,7 +751,7 @@ impl Interface { Ok(()) => {} } } - emitted_any + result } } diff --git a/src/iface/interface/multicast.rs b/src/iface/interface/multicast.rs index d66ba4d17..7542a12f4 100644 --- a/src/iface/interface/multicast.rs +++ b/src/iface/interface/multicast.rs @@ -145,10 +145,7 @@ impl Interface { /// - Send join/leave packets according to the multicast group state. /// - Depending on `igmp_report_state` and the therein contained /// timeouts, send IGMP membership reports. - pub(crate) fn multicast_egress(&mut self, device: &mut D) - where - D: Device + ?Sized, - { + pub(crate) fn multicast_egress(&mut self, device: &mut (impl Device + ?Sized)) { // Process multicast joins. while let Some((&addr, _)) = self .inner diff --git a/src/iface/interface/sixlowpan.rs b/src/iface/interface/sixlowpan.rs index a89934f1f..5cdfa3315 100644 --- a/src/iface/interface/sixlowpan.rs +++ b/src/iface/interface/sixlowpan.rs @@ -7,22 +7,15 @@ pub(crate) const MAX_DECOMPRESSED_LEN: usize = 1500; impl Interface { /// Process fragments that still need to be sent for 6LoWPAN packets. - /// - /// This function returns a boolean value indicating whether any packets were - /// processed or emitted, and thus, whether the readiness of any socket might - /// have changed. #[cfg(feature = "proto-sixlowpan-fragmentation")] - pub(super) fn sixlowpan_egress(&mut self, device: &mut D) -> bool - where - D: Device + ?Sized, - { + pub(super) fn sixlowpan_egress(&mut self, device: &mut (impl Device + ?Sized)) { // Reset the buffer when we transmitted everything. if self.fragmenter.finished() { self.fragmenter.reset(); } if self.fragmenter.is_empty() { - return false; + return; } let pkt = &self.fragmenter; @@ -30,10 +23,8 @@ impl Interface { if let Some(tx_token) = device.transmit(self.inner.now) { self.inner .dispatch_ieee802154_frag(tx_token, &mut self.fragmenter); - return true; } } - false } /// Get the 6LoWPAN address contexts. diff --git a/src/iface/interface/tests/ipv4.rs b/src/iface/interface/tests/ipv4.rs index 85fb82c5e..41e35301c 100644 --- a/src/iface/interface/tests/ipv4.rs +++ b/src/iface/interface/tests/ipv4.rs @@ -721,20 +721,14 @@ fn test_handle_igmp(#[case] medium: Medium) { } // General query - let timestamp = Instant::ZERO; const GENERAL_QUERY_BYTES: &[u8] = &[ 0x46, 0xc0, 0x00, 0x24, 0xed, 0xb4, 0x00, 0x00, 0x01, 0x02, 0x47, 0x43, 0xac, 0x16, 0x63, 0x04, 0xe0, 0x00, 0x00, 0x01, 0x94, 0x04, 0x00, 0x00, 0x11, 0x64, 0xec, 0x8f, 0x00, 0x00, 0x00, 0x00, 0x02, 0x0c, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, ]; - { - // Transmit GENERAL_QUERY_BYTES into loopback - let tx_token = device.transmit(timestamp).unwrap(); - tx_token.consume(GENERAL_QUERY_BYTES.len(), |buffer| { - buffer.copy_from_slice(GENERAL_QUERY_BYTES); - }); - } + device.rx_queue.push_back(GENERAL_QUERY_BYTES.to_vec()); + // Trigger processing until all packets received through the // loopback have been processed, including responses to // GENERAL_QUERY_BYTES. Therefore `recv_all()` would return 0 diff --git a/src/iface/interface/tests/mod.rs b/src/iface/interface/tests/mod.rs index db8cc89dd..fe98ddd3d 100644 --- a/src/iface/interface/tests/mod.rs +++ b/src/iface/interface/tests/mod.rs @@ -30,10 +30,8 @@ fn fill_slice(s: &mut [u8], val: u8) { #[allow(unused)] fn recv_all(device: &mut crate::tests::TestingDevice, timestamp: Instant) -> Vec> { let mut pkts = Vec::new(); - while let Some((rx, _tx)) = device.receive(timestamp) { - rx.consume(|pkt| { - pkts.push(pkt.to_vec()); - }); + while let Some(pkt) = device.tx_queue.pop_front() { + pkts.push(pkt) } pkts } diff --git a/src/iface/interface/tests/sixlowpan.rs b/src/iface/interface/tests/sixlowpan.rs index b80024a8b..14a4c46ae 100644 --- a/src/iface/interface/tests/sixlowpan.rs +++ b/src/iface/interface/tests/sixlowpan.rs @@ -244,7 +244,7 @@ fn test_echo_request_sixlowpan_128_bytes() { ); assert_eq!( - device.queue.pop_front().unwrap(), + device.tx_queue.pop_front().unwrap(), &[ 0x41, 0xcc, 0x3, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x2, 0x2, 0x2, 0x2, 0x2, 0x2, 0x2, 0x2, 0xc0, 0xb0, 0x5, 0x4e, 0x7a, 0x11, 0x3a, 0x92, 0xfc, 0x48, 0xc2, @@ -261,7 +261,7 @@ fn test_echo_request_sixlowpan_128_bytes() { iface.poll(Instant::now(), &mut device, &mut sockets); assert_eq!( - device.queue.pop_front().unwrap(), + device.tx_queue.pop_front().unwrap(), &[ 0x41, 0xcc, 0x4, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x2, 0x2, 0x2, 0x2, 0x2, 0x2, 0x2, 0x2, 0xe0, 0xb0, 0x5, 0x4e, 0xf, 0x48, 0x49, 0x4a, 0x4b, 0x4c, 0x4d, @@ -415,7 +415,7 @@ In at rhoncus tortor. Cras blandit tellus diam, varius vestibulum nibh commodo n iface.poll(Instant::now(), &mut device, &mut sockets); assert_eq!( - device.queue.pop_front().unwrap(), + device.tx_queue.pop_front().unwrap(), &[ 0x41, 0xcc, 0x3, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x2, 0x2, 0x2, 0x2, 0x2, 0x2, 0x2, 0x2, 0xc0, 0xb4, 0x5, 0x4e, 0x7e, 0x40, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, @@ -430,7 +430,7 @@ In at rhoncus tortor. Cras blandit tellus diam, varius vestibulum nibh commodo n ); assert_eq!( - device.queue.pop_front().unwrap(), + device.tx_queue.pop_front().unwrap(), &[ 0x41, 0xcc, 0x4, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x2, 0x2, 0x2, 0x2, 0x2, 0x2, 0x2, 0x2, 0xe0, 0xb4, 0x5, 0x4e, 0xf, 0x6f, 0x72, 0x74, 0x6f, 0x72, 0x2e, diff --git a/src/tests.rs b/src/tests.rs index b9eb740be..165bd52f7 100644 --- a/src/tests.rs +++ b/src/tests.rs @@ -1,4 +1,8 @@ +use std::collections::VecDeque; + use crate::iface::*; +use crate::phy::{self, Device, DeviceCapabilities, Medium}; +use crate::time::Instant; use crate::wire::*; pub(crate) fn setup<'a>(medium: Medium) -> (Interface, SocketSet<'a>, TestingDevice) { @@ -49,16 +53,11 @@ pub(crate) fn setup<'a>(medium: Medium) -> (Interface, SocketSet<'a>, TestingDev (iface, SocketSet::new(vec![]), device) } -use heapless::Deque; -use heapless::Vec; - -use crate::phy::{self, Device, DeviceCapabilities, Medium}; -use crate::time::Instant; - /// A testing device. #[derive(Debug)] pub struct TestingDevice { - pub(crate) queue: Deque, 4>, + pub(crate) tx_queue: VecDeque>, + pub(crate) rx_queue: VecDeque>, max_transmission_unit: usize, medium: Medium, } @@ -71,7 +70,8 @@ impl TestingDevice { /// in FIFO order. pub fn new(medium: Medium) -> Self { TestingDevice { - queue: Deque::new(), + tx_queue: VecDeque::new(), + rx_queue: VecDeque::new(), max_transmission_unit: match medium { #[cfg(feature = "medium-ethernet")] Medium::Ethernet => 1514, @@ -98,10 +98,10 @@ impl Device for TestingDevice { } fn receive(&mut self, _timestamp: Instant) -> Option<(Self::RxToken<'_>, Self::TxToken<'_>)> { - self.queue.pop_front().map(move |buffer| { + self.rx_queue.pop_front().map(move |buffer| { let rx = RxToken { buffer }; let tx = TxToken { - queue: &mut self.queue, + queue: &mut self.tx_queue, }; (rx, tx) }) @@ -109,14 +109,14 @@ impl Device for TestingDevice { fn transmit(&mut self, _timestamp: Instant) -> Option> { Some(TxToken { - queue: &mut self.queue, + queue: &mut self.tx_queue, }) } } #[doc(hidden)] pub struct RxToken { - buffer: Vec, + buffer: Vec, } impl phy::RxToken for RxToken { @@ -131,7 +131,7 @@ impl phy::RxToken for RxToken { #[doc(hidden)] #[derive(Debug)] pub struct TxToken<'a> { - queue: &'a mut Deque, 4>, + queue: &'a mut VecDeque>, } impl<'a> phy::TxToken for TxToken<'a> { @@ -139,10 +139,9 @@ impl<'a> phy::TxToken for TxToken<'a> { where F: FnOnce(&mut [u8]) -> R, { - let mut buffer = Vec::new(); - buffer.resize(len, 0).unwrap(); + let mut buffer = vec![0; len]; let result = f(&mut buffer); - self.queue.push_back(buffer).unwrap(); + self.queue.push_back(buffer); result } }