diff --git a/CHANGELOG.md b/CHANGELOG.md index 9fbfb4a..48abcf0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -57,6 +57,10 @@ Released on 13/10/2024 - Bump `ratatui` version to `0.28` - Dont enable `MouseCapture` by default - Add function `enable_mouse_capture` and `disable_mouse_capture` to `TerminalBridge` +- **Max poll for ports**: + - Add `Port::set_max_poll` to set the amount a `Port` is polled in a single `Port::should_poll`. + - Add `EventListenerCfg::port` to add a manually constructed `Port` + - Previous `EventListenerCfg::port` has been renamed to `EventListenerCfg::add_port` Huge thanks to [hasezoey](https://github.com/hasezoey) for the contributions. diff --git a/examples/demo/app/model.rs b/examples/demo/app/model.rs index cb32478..aa9e317 100644 --- a/examples/demo/app/model.rs +++ b/examples/demo/app/model.rs @@ -77,7 +77,7 @@ where let mut app: Application = Application::init( EventListenerCfg::default() - .crossterm_input_listener(Duration::from_millis(20)) + .crossterm_input_listener(Duration::from_millis(20), 3) .poll_timeout(Duration::from_millis(10)) .tick_interval(Duration::from_secs(1)), ); diff --git a/examples/user_events/user_events.rs b/examples/user_events/user_events.rs index 5eb3be7..8b5aa20 100644 --- a/examples/user_events/user_events.rs +++ b/examples/user_events/user_events.rs @@ -51,10 +51,11 @@ impl Poll for UserDataPort { fn main() { let event_listener = EventListenerCfg::default() - .crossterm_input_listener(Duration::from_millis(10)) - .port( + .crossterm_input_listener(Duration::from_millis(10), 3) + .add_port( Box::new(UserDataPort::default()), Duration::from_millis(1000), + 1, ); let mut app: Application = Application::init(event_listener); diff --git a/src/core/application.rs b/src/core/application.rs index b0f1477..999eb9e 100644 --- a/src/core/application.rs +++ b/src/core/application.rs @@ -1035,9 +1035,10 @@ mod test { } fn listener_config() -> EventListenerCfg { - EventListenerCfg::default().port( + EventListenerCfg::default().add_port( Box::new(MockPoll::::default()), Duration::from_millis(100), + 1, ) } diff --git a/src/listener/builder.rs b/src/listener/builder.rs index 5ebb8a8..c84d748 100644 --- a/src/listener/builder.rs +++ b/src/listener/builder.rs @@ -59,27 +59,45 @@ where self } - /// Add a new Port (Poll, Interval) to the the event listener - pub fn port(mut self, poll: Box>, interval: Duration) -> Self { - self.ports.push(Port::new(poll, interval)); + /// Add a new [`Port`] (Poll, Interval) to the the event listener. + /// + /// The interval is the amount of time between each [`Poll::poll`] call. + /// The max_poll is the maximum amount of times the port should be polled in a single poll. + pub fn add_port(self, poll: Box>, interval: Duration, max_poll: usize) -> Self { + self.port(Port::new(poll, interval, max_poll)) + } + + /// Add a new [`Port`] to the the event listener + /// + /// The [`Port`] needs to be manually constructed, unlike [`Self::add_port`] + pub fn port(mut self, port: Port) -> Self { + self.ports.push(port); self } #[cfg(feature = "crossterm")] /// Add to the event listener the default crossterm input listener [`crate::terminal::CrosstermInputListener`] - pub fn crossterm_input_listener(self, interval: Duration) -> Self { - self.port( + /// + /// The interval is the amount of time between each [`Poll::poll`] call. + /// The max_poll is the maximum amount of times the port should be polled in a single poll. + pub fn crossterm_input_listener(self, interval: Duration, max_poll: usize) -> Self { + self.add_port( Box::new(crate::terminal::CrosstermInputListener::::new(interval)), interval, + max_poll, ) } #[cfg(feature = "termion")] /// Add to the event listener the default termion input listener [`crate::terminal::TermionInputListener`] - pub fn termion_input_listener(self, interval: Duration) -> Self { - self.port( + /// + /// The interval is the amount of time between each [`Poll::poll`] call. + /// The max_poll is the maximum amount of times the port should be polled in a single poll. + pub fn termion_input_listener(self, interval: Duration, max_poll: usize) -> Self { + self.add_port( Box::new(crate::terminal::TermionInputListener::::new(interval)), interval, + max_poll, ) } } @@ -104,8 +122,8 @@ mod test { let builder = builder.poll_timeout(Duration::from_millis(50)); assert_eq!(builder.poll_timeout, Duration::from_millis(50)); let builder = builder - .crossterm_input_listener(Duration::from_millis(200)) - .port(Box::new(MockPoll::default()), Duration::from_secs(300)); + .crossterm_input_listener(Duration::from_millis(200), 1) + .add_port(Box::new(MockPoll::default()), Duration::from_secs(300), 1); assert_eq!(builder.ports.len(), 2); let mut listener = builder.start(); assert!(listener.stop().is_ok()); @@ -123,8 +141,8 @@ mod test { let builder = builder.poll_timeout(Duration::from_millis(50)); assert_eq!(builder.poll_timeout, Duration::from_millis(50)); let builder = builder - .termion_input_listener(Duration::from_millis(200)) - .port(Box::new(MockPoll::default()), Duration::from_secs(300)); + .termion_input_listener(Duration::from_millis(200), 1) + .add_port(Box::new(MockPoll::default()), Duration::from_secs(300), 1); assert_eq!(builder.ports.len(), 2); let mut listener = builder.start(); assert!(listener.stop().is_ok()); @@ -137,4 +155,16 @@ mod test { .poll_timeout(Duration::from_secs(0)) .start(); } + + #[test] + fn should_add_port_via_port_1() { + let builder = EventListenerCfg::::default(); + assert!(builder.ports.is_empty()); + let builder = builder.port(Port::new( + Box::new(MockPoll::default()), + Duration::from_millis(1), + 1, + )); + assert_eq!(builder.ports.len(), 1); + } } diff --git a/src/listener/mod.rs b/src/listener/mod.rs index bd51214..9dc8106 100644 --- a/src/listener/mod.rs +++ b/src/listener/mod.rs @@ -247,6 +247,7 @@ mod test { vec![Port::new( Box::new(MockPoll::default()), Duration::from_secs(10), + 1, )], Duration::from_millis(10), Some(Duration::from_secs(3)), diff --git a/src/listener/port.rs b/src/listener/port.rs index 2b8f1bd..326eb3b 100644 --- a/src/listener/port.rs +++ b/src/listener/port.rs @@ -17,21 +17,34 @@ where poll: Box>, interval: Duration, next_poll: Instant, + max_poll: usize, } impl Port where U: Eq + PartialEq + Clone + PartialOrd + Send + 'static, { - /// Define a new `Port` - pub fn new(poll: Box>, interval: Duration) -> Self { + /// Define a new [`Port`] + /// + /// # Parameters + /// + /// * `poll` - The poll trait object + /// * `interval` - The interval between each poll + /// * `max_poll` - The maximum amount of times the port should be polled in a single poll + pub fn new(poll: Box>, interval: Duration, max_poll: usize) -> Self { Self { poll, interval, next_poll: Instant::now(), + max_poll, } } + /// Get how often a port should get polled in a single poll + pub fn max_poll(&self) -> usize { + self.max_poll + } + /// Returns the interval for the current [`Port`] pub fn interval(&self) -> &Duration { &self.interval @@ -69,7 +82,7 @@ mod test { #[test] fn test_single_listener() { let mut listener = - Port::::new(Box::new(MockPoll::default()), Duration::from_secs(5)); + Port::::new(Box::new(MockPoll::default()), Duration::from_secs(5), 1); assert!(listener.next_poll() <= Instant::now()); assert_eq!(listener.should_poll(), true); assert!(listener.poll().ok().unwrap().is_some()); diff --git a/src/listener/worker.rs b/src/listener/worker.rs index 165f76c..1f6ada9 100644 --- a/src/listener/worker.rs +++ b/src/listener/worker.rs @@ -118,35 +118,32 @@ where /// Returns only the messages, while the None returned by poll are discarded #[allow(clippy::needless_collect)] fn poll(&mut self) -> Result<(), mpsc::SendError>> { - let msg: Vec> = self - .ports - .iter_mut() - .filter_map(|x| { - if x.should_poll() { - let msg = match x.poll() { - Ok(Some(ev)) => Some(ListenerMsg::User(ev)), - Ok(None) => None, - Err(err) => Some(ListenerMsg::Error(err)), - }; - // Update next poll - x.calc_next_poll(); - msg - } else { - None + let port_iter = self.ports.iter_mut().filter(|port| port.should_poll()); + + for port in port_iter { + let mut times_remaining = port.max_poll(); + // poll a port until it has nothing anymore + loop { + let msg = match port.poll() { + Ok(Some(ev)) => ListenerMsg::User(ev), + Ok(None) => break, + Err(err) => ListenerMsg::Error(err), + }; + + self.sender.send(msg)?; + + // do this at the end to at least call it once + times_remaining = times_remaining.saturating_sub(1); + + if times_remaining == 0 { + break; } - }) - .collect(); - // Send messages - match msg - .into_iter() - .map(|x| self.sender.send(x)) - .filter(|x| x.is_err()) - .map(|x| x.err().unwrap()) - .next() - { - None => Ok(()), - Some(e) => Err(e), + } + // Update next poll + port.calc_next_poll(); } + + Ok(()) } /// thread run method @@ -186,6 +183,29 @@ mod test { use crate::mock::{MockEvent, MockPoll}; use crate::Event; + #[test] + fn worker_should_poll_multiple_times() { + let (tx, rx) = mpsc::channel(); + let paused = Arc::new(RwLock::new(false)); + let paused_t = Arc::clone(&paused); + let running = Arc::new(RwLock::new(true)); + let running_t = Arc::clone(&running); + + let mock_port = Port::new(Box::new(MockPoll::default()), Duration::from_secs(5), 10); + + let mut worker = + EventListenerWorker::::new(vec![mock_port], tx, paused_t, running_t, None); + assert!(worker.poll().is_ok()); + assert!(worker.next_event() <= Duration::from_secs(5)); + let mut recieved = Vec::new(); + + while let Ok(msg) = rx.try_recv() { + recieved.push(msg); + } + + assert_eq!(recieved.len(), 10); + } + #[test] fn worker_should_send_poll() { let (tx, rx) = mpsc::channel(); @@ -197,6 +217,7 @@ mod test { vec![Port::new( Box::new(MockPoll::default()), Duration::from_secs(5), + 1, )], tx, paused_t, @@ -223,6 +244,7 @@ mod test { vec![Port::new( Box::new(MockPoll::default()), Duration::from_secs(5), + 1, )], tx, paused_t, @@ -249,6 +271,7 @@ mod test { vec![Port::new( Box::new(MockPoll::default()), Duration::from_secs(5), + 1, )], tx, paused_t, @@ -293,6 +316,7 @@ mod test { vec![Port::new( Box::new(MockPoll::default()), Duration::from_secs(3), + 1, )], tx, paused_t,