From 6319d81fee9e5c1705867cfd348bfa5516d83fb8 Mon Sep 17 00:00:00 2001 From: Daniel McNab <36049421+DJMcNab@users.noreply.github.com> Date: Sat, 9 Sep 2023 09:57:32 +0100 Subject: [PATCH] Add opt-in handlers for before sleeping and before handling events (#144) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Add opt-in handlers for before sleeping and before handling events * Remove pre_run and fix some missed impls * Appease the great 📎 * Use an associated constant instead * Do the borrow outside the loop * Fix cfged out mistake * Update documentation to reflect changes * Actually use the synthetic events * Add some tests for the new functionality * Pin `nix` to a lower version * Rename `before_will_sleep` to `before_sleep` * Add some more coverage * Solve review comments * Restore inconsistencies in formatting * Remove AdditionalLifetimeEventsRegister * Remove unused lifetime * Unencapsulate the refcell * Address review comments * Exclude synthetic events from the iterator * Follow clippy's improvement (?) --- CHANGELOG.md | 6 +- src/io.rs | 31 ++- src/loop_logic.rs | 479 +++++++++++++++++++++++++++++++++++++++------ src/sources/mod.rs | 198 +++++++++++++------ src/sys.rs | 6 + 5 files changed, 589 insertions(+), 131 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d4f48dbd..6eaf7549 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,8 +6,10 @@ - Bump MSRV to 1.63 - Make signals an optional feature under the `signals` features. -- Replace the `nix` crate with standard library I/O errors and the `rustix` -crate. +- Replace the `nix` crate with standard library I/O errors and the `rustix` crate. +- `pre_run` and `post_run` on `EventSource` have been replaced with `before_sleep` and `before_handle_events`, respectively. + These are now opt-in through the `NEEDS_EXTRA_LIFECYCLE_EVENTS` associated constant, and occur at slightly different times to + the methods they are replacing. This allows greater compatibility with Wayland based event sources. ## 0.11.0 -- 2023-06-05 diff --git a/src/io.rs b/src/io.rs index e9e3a467..587b8885 100644 --- a/src/io.rs +++ b/src/io.rs @@ -18,11 +18,13 @@ use rustix::fs::{fcntl_getfl, fcntl_setfl, OFlags}; #[cfg(feature = "futures-io")] use futures_io::{AsyncRead, AsyncWrite, IoSlice, IoSliceMut}; +use crate::loop_logic::EventIterator; use crate::{ loop_logic::{LoopInner, MAX_SOURCES_MASK}, sources::EventDispatcher, Interest, Mode, Poll, PostAction, Readiness, Token, TokenFactory, }; +use crate::{AdditionalLifecycleEventsSet, RegistrationToken}; /// Adapter for async IO manipulations /// @@ -237,17 +239,32 @@ impl EventDispatcher for RefCell { Ok(PostAction::Continue) } - fn register(&self, _: &mut Poll, _: &mut TokenFactory) -> crate::Result<()> { + fn register( + &self, + _: &mut Poll, + _: &mut AdditionalLifecycleEventsSet, + _: &mut TokenFactory, + ) -> crate::Result<()> { // registration is handled by IoLoopInner unreachable!() } - fn reregister(&self, _: &mut Poll, _: &mut TokenFactory) -> crate::Result { + fn reregister( + &self, + _: &mut Poll, + _: &mut AdditionalLifecycleEventsSet, + _: &mut TokenFactory, + ) -> crate::Result { // registration is handled by IoLoopInner unreachable!() } - fn unregister(&self, poll: &mut Poll) -> crate::Result { + fn unregister( + &self, + poll: &mut Poll, + _: &mut AdditionalLifecycleEventsSet, + _: RegistrationToken, + ) -> crate::Result { let disp = self.borrow(); if disp.is_registered { poll.unregister(unsafe { BorrowedFd::borrow_raw(disp.fd) })?; @@ -255,12 +272,10 @@ impl EventDispatcher for RefCell { Ok(true) } - fn pre_run(&self, _data: &mut Data) -> crate::Result<()> { - Ok(()) - } - fn post_run(&self, _data: &mut Data) -> crate::Result<()> { - Ok(()) + fn before_sleep(&self) -> crate::Result> { + Ok(None) } + fn before_handle_events(&self, _: EventIterator<'_>) {} } /* diff --git a/src/loop_logic.rs b/src/loop_logic.rs index 7d3d4cf6..508cc027 100644 --- a/src/loop_logic.rs +++ b/src/loop_logic.rs @@ -1,10 +1,10 @@ use std::cell::{Cell, RefCell}; use std::fmt::Debug; -use std::io; use std::rc::Rc; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::time::{Duration, Instant}; +use std::{io, slice}; #[cfg(feature = "block_on")] use std::future::Future; @@ -13,8 +13,11 @@ use io_lifetimes::AsFd; use slab::Slab; use crate::sources::{Dispatcher, EventSource, Idle, IdleDispatcher}; -use crate::sys::Notifier; -use crate::{EventDispatcher, InsertError, Poll, PostAction, TokenFactory}; +use crate::sys::{Notifier, PollEvent}; +use crate::{ + AdditionalLifecycleEventsSet, EventDispatcher, InsertError, Poll, PostAction, Readiness, Token, + TokenFactory, +}; type IdleCallback<'i, Data> = Rc + 'i>>; @@ -53,9 +56,19 @@ pub struct RegistrationToken { key: usize, } +impl RegistrationToken { + /// Create the RegistrationToken corresponding to the given raw key + /// This is needed because some methods use `RegistrationToken`s as + /// raw usizes within this crate + pub(crate) fn new(key: usize) -> Self { + Self { key } + } +} + pub(crate) struct LoopInner<'l, Data> { pub(crate) poll: RefCell, pub(crate) sources: RefCell + 'l>>>, + pub(crate) sources_with_additional_lifecycle_events: RefCell, idles: RefCell>>, pending_action: Cell, } @@ -135,10 +148,14 @@ impl<'l, Data> LoopHandle<'l, Data> { } let key = sources.insert(dispatcher.clone_as_event_dispatcher()); - let ret = sources - .get(key) - .unwrap() - .register(&mut poll, &mut TokenFactory::new(key)); + let ret = sources.get(key).unwrap().register( + &mut poll, + &mut self + .inner + .sources_with_additional_lifecycle_events + .borrow_mut(), + &mut TokenFactory::new(key), + ); if let Err(error) = ret { sources.try_remove(key).expect("Source was just inserted?!"); @@ -172,6 +189,10 @@ impl<'l, Data> LoopHandle<'l, Data> { if let Some(source) = self.inner.sources.borrow().get(token.key) { source.register( &mut self.inner.poll.borrow_mut(), + &mut self + .inner + .sources_with_additional_lifecycle_events + .borrow_mut(), &mut TokenFactory::new(token.key), )?; } @@ -186,6 +207,10 @@ impl<'l, Data> LoopHandle<'l, Data> { if let Some(source) = self.inner.sources.borrow().get(token.key) { if !source.reregister( &mut self.inner.poll.borrow_mut(), + &mut self + .inner + .sources_with_additional_lifecycle_events + .borrow_mut(), &mut TokenFactory::new(token.key), )? { // we are in a callback, store for later processing @@ -200,7 +225,14 @@ impl<'l, Data> LoopHandle<'l, Data> { /// The source remains in the event loop, but it'll no longer generate events pub fn disable(&self, token: &RegistrationToken) -> crate::Result<()> { if let Some(source) = self.inner.sources.borrow().get(token.key) { - if !source.unregister(&mut self.inner.poll.borrow_mut())? { + if !source.unregister( + &mut self.inner.poll.borrow_mut(), + &mut self + .inner + .sources_with_additional_lifecycle_events + .borrow_mut(), + *token, + )? { // we are in a callback, store for later processing self.inner.pending_action.set(PostAction::Disable); } @@ -211,7 +243,14 @@ impl<'l, Data> LoopHandle<'l, Data> { /// Removes this source from the event loop. pub fn remove(&self, token: RegistrationToken) { if let Some(source) = self.inner.sources.borrow_mut().try_remove(token.key) { - if let Err(e) = source.unregister(&mut self.inner.poll.borrow_mut()) { + if let Err(e) = source.unregister( + &mut self.inner.poll.borrow_mut(), + &mut self + .inner + .sources_with_additional_lifecycle_events + .borrow_mut(), + token, + ) { log::warn!( "[calloop] Failed to unregister source from the polling system: {:?}", e @@ -238,6 +277,8 @@ impl<'l, Data> LoopHandle<'l, Data> { pub struct EventLoop<'l, Data> { handle: LoopHandle<'l, Data>, signals: Arc, + // A caching vector for synthetic poll events + synthetic_events: Vec, } impl<'l, Data> std::fmt::Debug for EventLoop<'l, Data> { @@ -269,6 +310,7 @@ impl<'l, Data> EventLoop<'l, Data> { sources: RefCell::new(Slab::new()), idles: RefCell::new(Vec::new()), pending_action: Cell::new(PostAction::Continue), + sources_with_additional_lifecycle_events: Default::default(), }), }; @@ -279,6 +321,7 @@ impl<'l, Data> EventLoop<'l, Data> { #[cfg(feature = "block_on")] future_ready: AtomicBool::new(false), }), + synthetic_events: vec![], }) } @@ -293,6 +336,25 @@ impl<'l, Data> EventLoop<'l, Data> { data: &mut Data, ) -> crate::Result<()> { let now = Instant::now(); + { + let mut extra_lifecycle_sources = self + .handle + .inner + .sources_with_additional_lifecycle_events + .borrow_mut(); + let sources = &self.handle.inner.sources.borrow(); + for source in &mut *extra_lifecycle_sources.values { + if let Some(disp) = sources.get(source.key) { + if let Some((readiness, token)) = disp.before_sleep()? { + // Wake up instantly after polling if we recieved an event + timeout = Some(Duration::ZERO); + self.synthetic_events.push(PollEvent { readiness, token }); + } + } else { + unreachable!() + } + } + } let events = { let poll = self.handle.inner.poll.borrow(); loop { @@ -315,8 +377,28 @@ impl<'l, Data> EventLoop<'l, Data> { }; } }; + { + let mut extra_lifecycle_sources = self + .handle + .inner + .sources_with_additional_lifecycle_events + .borrow_mut(); + if !extra_lifecycle_sources.values.is_empty() { + for source in &mut *extra_lifecycle_sources.values { + if let Some(disp) = self.handle.inner.sources.borrow().get(source.key) { + let iter = EventIterator { + inner: self.synthetic_events.iter(), + registration_token: *source, + }; + disp.before_handle_events(iter); + } else { + unreachable!() + } + } + } + } - for event in events { + for event in self.synthetic_events.drain(..).chain(events) { // Get the registration token associated with the event. let registroken_token = event.token.key & MAX_SOURCES_MASK; @@ -345,11 +427,24 @@ impl<'l, Data> EventLoop<'l, Data> { PostAction::Reregister => { disp.reregister( &mut self.handle.inner.poll.borrow_mut(), + &mut self + .handle + .inner + .sources_with_additional_lifecycle_events + .borrow_mut(), &mut TokenFactory::new(event.token.key), )?; } PostAction::Disable => { - disp.unregister(&mut self.handle.inner.poll.borrow_mut())?; + disp.unregister( + &mut self.handle.inner.poll.borrow_mut(), + &mut self + .handle + .inner + .sources_with_additional_lifecycle_events + .borrow_mut(), + RegistrationToken::new(registroken_token), + )?; } PostAction::Remove => { // delete the source from the list, it'll be cleaned up with the if just below @@ -371,7 +466,15 @@ impl<'l, Data> EventLoop<'l, Data> { { // the source has been removed from within its callback, unregister it let mut poll = self.handle.inner.poll.borrow_mut(); - if let Err(e) = disp.unregister(&mut poll) { + if let Err(e) = disp.unregister( + &mut poll, + &mut self + .handle + .inner + .sources_with_additional_lifecycle_events + .borrow_mut(), + RegistrationToken::new(registroken_token), + ) { log::warn!( "[calloop] Failed to unregister source from the polling system: {:?}", e @@ -396,40 +499,6 @@ impl<'l, Data> EventLoop<'l, Data> { } } - fn invoke_pre_run(&self, data: &mut Data) -> crate::Result<()> { - let sources = self - .handle - .inner - .sources - .borrow() - .iter() - .map(|(_, source)| source.clone()) - .collect::>(); - - for source in sources { - source.pre_run(data)?; - } - - Ok(()) - } - - fn invoke_post_run(&self, data: &mut Data) -> crate::Result<()> { - let sources = self - .handle - .inner - .sources - .borrow() - .iter() - .map(|(_, source)| source.clone()) - .collect::>(); - - for source in sources { - source.post_run(data)?; - } - - Ok(()) - } - /// Dispatch pending events to their callbacks /// /// If some sources have events available, their callbacks will be immediatly called. @@ -443,10 +512,8 @@ impl<'l, Data> EventLoop<'l, Data> { timeout: D, data: &mut Data, ) -> crate::Result<()> { - self.invoke_pre_run(data)?; self.dispatch_events(timeout.into(), data)?; self.dispatch_idles(data); - self.invoke_post_run(data)?; Ok(()) } @@ -481,13 +548,10 @@ impl<'l, Data> EventLoop<'l, Data> { { let timeout = timeout.into(); self.signals.stop.store(false, Ordering::Release); - self.invoke_pre_run(data)?; while !self.signals.stop.load(Ordering::Acquire) { - self.dispatch_events(timeout, data)?; - self.dispatch_idles(data); + self.dispatch(timeout, data)?; cb(data); } - self.invoke_post_run(data)?; Ok(()) } @@ -541,8 +605,6 @@ impl<'l, Data> EventLoop<'l, Data> { self.signals.stop.store(false, Ordering::Release); self.signals.future_ready.store(true, Ordering::Release); - self.invoke_pre_run(data)?; - while !self.signals.stop.load(Ordering::Acquire) { // If the future is ready to be polled, poll it. if self.signals.future_ready.swap(false, Ordering::AcqRel) { @@ -559,11 +621,35 @@ impl<'l, Data> EventLoop<'l, Data> { cb(data); } - self.invoke_post_run(data)?; Ok(output) } } +#[derive(Clone, Debug)] +/// The EventIterator is an `Iterator` over the events relevant to a particular source +/// This type is used in the [`EventSource::before_handle_events`] methods for +/// two main reasons: +/// - To avoid dynamic dispatch overhead +/// - Secondly, it is to allow this type to be `Clone`, which is not +/// possible with dynamic dispatch +pub struct EventIterator<'a> { + inner: slice::Iter<'a, PollEvent>, + registration_token: RegistrationToken, +} + +impl<'a> Iterator for EventIterator<'a> { + type Item = (Readiness, Token); + + fn next(&mut self) -> Option { + for next in self.inner.by_ref() { + if next.token.key & MAX_SOURCES_MASK == self.registration_token.key { + return Some((next.readiness, next.token)); + } + } + None + } +} + /// A signal that can be shared between thread to stop or wakeup a running /// event loop #[derive(Clone)] @@ -603,10 +689,13 @@ impl LoopSignal { #[cfg(test)] mod tests { - use std::time::Duration; + use std::{cell::Cell, rc::Rc, time::Duration}; use crate::{ - generic::Generic, ping::*, Dispatcher, Interest, Mode, Poll, PostAction, Readiness, + channel::{channel, Channel}, + generic::Generic, + ping::*, + Dispatcher, EventIterator, EventSource, Interest, Mode, Poll, PostAction, Readiness, RegistrationToken, Token, TokenFactory, }; @@ -680,6 +769,282 @@ mod tests { event_loop.run(None, &mut (), |_| {}).unwrap(); } + #[test] + fn additional_events() { + let mut event_loop: EventLoop<'_, Lock> = EventLoop::try_new().unwrap(); + let mut lock = Lock { + lock: Rc::new((Cell::new(false), Cell::new(0))), + }; + let (sender, channel) = channel(); + let token = event_loop + .handle() + .insert_source( + LockingSource { + channel, + lock: lock.clone(), + }, + |_, _, lock| { + lock.lock(); + lock.unlock(); + }, + ) + .unwrap(); + sender.send(()).unwrap(); + + event_loop.dispatch(None, &mut lock).unwrap(); + // We should have been locked twice so far + assert_eq!(lock.lock.1.get(), 2); + event_loop.handle().disable(&token).unwrap(); + event_loop + .dispatch(Some(Duration::ZERO), &mut lock) + .unwrap(); + assert_eq!(lock.lock.1.get(), 2); + + event_loop.handle().enable(&token).unwrap(); + event_loop + .dispatch(Some(Duration::ZERO), &mut lock) + .unwrap(); + assert_eq!(lock.lock.1.get(), 3); + event_loop.handle().remove(token); + event_loop + .dispatch(Some(Duration::ZERO), &mut lock) + .unwrap(); + assert_eq!(lock.lock.1.get(), 3); + + #[derive(Clone)] + struct Lock { + lock: Rc<(Cell, Cell)>, + } + impl Lock { + fn lock(&self) { + if self.lock.0.get() { + panic!(); + } + // Increase the count + self.lock.1.set(self.lock.1.get() + 1); + self.lock.0.set(true) + } + fn unlock(&self) { + if !self.lock.0.get() { + panic!(); + } + self.lock.0.set(false); + } + } + struct LockingSource { + channel: Channel<()>, + lock: Lock, + } + impl EventSource for LockingSource { + type Event = as EventSource>::Event; + + type Metadata = as EventSource>::Metadata; + + type Ret = as EventSource>::Ret; + + type Error = as EventSource>::Error; + + fn process_events( + &mut self, + readiness: Readiness, + token: Token, + callback: F, + ) -> Result + where + F: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret, + { + self.channel.process_events(readiness, token, callback) + } + + fn register( + &mut self, + poll: &mut Poll, + token_factory: &mut TokenFactory, + ) -> crate::Result<()> { + self.channel.register(poll, token_factory) + } + + fn reregister( + &mut self, + poll: &mut Poll, + token_factory: &mut TokenFactory, + ) -> crate::Result<()> { + self.channel.reregister(poll, token_factory) + } + + fn unregister(&mut self, poll: &mut Poll) -> crate::Result<()> { + self.channel.unregister(poll) + } + + const NEEDS_EXTRA_LIFECYCLE_EVENTS: bool = true; + + fn before_sleep(&mut self) -> crate::Result> { + self.lock.lock(); + Ok(None) + } + + fn before_handle_events(&mut self, _: EventIterator) { + self.lock.unlock(); + } + } + } + #[test] + fn default_additional_events() { + let (sender, channel) = channel(); + let mut test_source = NoopWithDefaultHandlers { channel }; + let mut event_loop = EventLoop::try_new().unwrap(); + event_loop + .handle() + .insert_source(Box::new(&mut test_source), |_, _, _| {}) + .unwrap(); + sender.send(()).unwrap(); + + event_loop.dispatch(None, &mut ()).unwrap(); + struct NoopWithDefaultHandlers { + channel: Channel<()>, + } + impl EventSource for NoopWithDefaultHandlers { + type Event = as EventSource>::Event; + + type Metadata = as EventSource>::Metadata; + + type Ret = as EventSource>::Ret; + + type Error = as EventSource>::Error; + + fn process_events( + &mut self, + readiness: Readiness, + token: Token, + callback: F, + ) -> Result + where + F: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret, + { + self.channel.process_events(readiness, token, callback) + } + + fn register( + &mut self, + poll: &mut Poll, + token_factory: &mut TokenFactory, + ) -> crate::Result<()> { + self.channel.register(poll, token_factory) + } + + fn reregister( + &mut self, + poll: &mut Poll, + token_factory: &mut TokenFactory, + ) -> crate::Result<()> { + self.channel.reregister(poll, token_factory) + } + + fn unregister(&mut self, poll: &mut Poll) -> crate::Result<()> { + self.channel.unregister(poll) + } + + const NEEDS_EXTRA_LIFECYCLE_EVENTS: bool = true; + } + } + + #[test] + fn additional_events_synthetic() { + let mut event_loop: EventLoop<'_, Lock> = EventLoop::try_new().unwrap(); + let mut lock = Lock { + lock: Rc::new(Cell::new(false)), + }; + event_loop + .handle() + .insert_source( + InstantWakeupLockingSource { + lock: lock.clone(), + token: None, + }, + |_, _, lock| { + lock.lock(); + lock.unlock(); + }, + ) + .unwrap(); + + // Loop should finish, as + event_loop.dispatch(None, &mut lock).unwrap(); + #[derive(Clone)] + struct Lock { + lock: Rc>, + } + impl Lock { + fn lock(&self) { + if self.lock.get() { + panic!(); + } + self.lock.set(true) + } + fn unlock(&self) { + if !self.lock.get() { + panic!(); + } + self.lock.set(false); + } + } + struct InstantWakeupLockingSource { + lock: Lock, + token: Option, + } + impl EventSource for InstantWakeupLockingSource { + type Event = (); + + type Metadata = (); + + type Ret = (); + + type Error = as EventSource>::Error; + + fn process_events( + &mut self, + _: Readiness, + token: Token, + mut callback: F, + ) -> Result + where + F: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret, + { + assert_eq!(token, self.token.unwrap()); + callback((), &mut ()); + Ok(PostAction::Continue) + } + + fn register( + &mut self, + _: &mut Poll, + token_factory: &mut TokenFactory, + ) -> crate::Result<()> { + self.token = Some(token_factory.token()); + Ok(()) + } + + fn reregister(&mut self, _: &mut Poll, _: &mut TokenFactory) -> crate::Result<()> { + unreachable!() + } + + fn unregister(&mut self, _: &mut Poll) -> crate::Result<()> { + unreachable!() + } + + const NEEDS_EXTRA_LIFECYCLE_EVENTS: bool = true; + + fn before_sleep(&mut self) -> crate::Result> { + self.lock.lock(); + Ok(Some((Readiness::EMPTY, self.token.unwrap()))) + } + + fn before_handle_events(&mut self, _: EventIterator) { + self.lock.unlock(); + } + } + } + #[test] fn insert_bad_source() { use std::os::unix::io::FromRawFd; diff --git a/src/sources/mod.rs b/src/sources/mod.rs index dd604d26..f314442f 100644 --- a/src/sources/mod.rs +++ b/src/sources/mod.rs @@ -4,7 +4,8 @@ use std::{ rc::Rc, }; -use crate::{sys::TokenFactory, Poll, Readiness, Token}; +pub use crate::loop_logic::EventIterator; +use crate::{sys::TokenFactory, Poll, Readiness, RegistrationToken, Token}; pub mod channel; #[cfg(feature = "executor")] @@ -85,10 +86,17 @@ impl BitOrAssign for PostAction { /// /// In case your event source needs to do some special processing before or after a /// polling session occurs (to prepare the underlying source for polling, and cleanup -/// after that), you can override the `pre_run` and `post_run`, that do nothing by -/// default. Depending on the underlying events, `process_events` may be invoked once, -/// several times, or none at all between `pre_run` and `post_run` are called, but when -/// it is invoked, it'll always be between those two. +/// after that), you can override [`NEEDS_EXTRA_LIFECYCLE_EVENTS`] to `true`. +/// For all sources for which that constant is `true`, the methods [`before_sleep`] and +/// [`before_handle_events`] will be called. +/// [`before_sleep`] is called before the polling system performs a poll operation. +/// [`before_handle_events`] is called before any process_events methods have been called. +/// This means that during `process_events` you can assume that all cleanup has occured on +/// all sources. +/// +/// [`NEEDS_EXTRA_LIFECYCLE_EVENTS`]: EventSource::NEEDS_EXTRA_LIFECYCLE_EVENTS +/// [`before_sleep`]: EventSource::before_sleep +/// [`before_handle_events`]: EventSource::before_handle_events pub trait EventSource { /// The type of events generated by your source. type Event; @@ -157,25 +165,38 @@ pub trait EventSource { /// [`Poll::unregister`](crate::Poll#method.unregister) method. fn unregister(&mut self, poll: &mut Poll) -> crate::Result<()>; - /// Notification that a polling session is going to start + /// Whether this source needs to be sent the [`EventSource::before_sleep`] + /// and [`EventSource::before_handle_events`] notifications. These are opt-in because + /// they require more expensive checks, and almost all sources will not need these notifications + const NEEDS_EXTRA_LIFECYCLE_EVENTS: bool = false; + /// Notification that a single `poll` is about to begin /// - /// You can generate events from this method as you would from `process_events`. - fn pre_run(&mut self, _callback: F) -> crate::Result<()> - where - F: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret, - { - Ok(()) - } - - /// Notification that the current polling session ended + /// Use this to perform operations which must be done before polling, + /// but which may conflict with other event handlers. For example, + /// if polling requires a lock to be taken /// - /// You can generate events from this method as you would from `process_events`. - fn post_run(&mut self, _callback: F) -> crate::Result<()> - where - F: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret, - { - Ok(()) - } + /// If this returns Ok(Some), this will be treated as an event arriving in polling, and + /// your event handler will be called with the returned `Token` and `Readiness`. + /// Polling will however still occur, but with a timeout of 0, so additional events + /// from this or other sources may also be handled in the same iterations. + /// The returned `Token` must belong to this source + // If you need to return multiple synthetic events from this notification, please + // open an issue + fn before_sleep(&mut self) -> crate::Result> { + Ok(None) + } + /// Notification that polling is complete, and [`EventSource::process_events`] will + /// be called with the given events for this source. The iterator may be empty, + /// which indicates that no events were generated for this source + /// + /// Please note, the iterator excludes any synthetic events returned from + /// [`EventSource::before_sleep`] + /// + /// Use this to perform a cleanup before event handlers with arbitrary + /// code may run. This could be used to drop a lock obtained in + /// [`EventSource::before_sleep`] + #[allow(unused_variables)] + fn before_handle_events(&mut self, events: EventIterator<'_>) {} } /// Blanket implementation for boxed event sources. [`EventSource`] is not an @@ -214,18 +235,14 @@ impl EventSource for Box { T::unregister(&mut **self, poll) } - fn pre_run(&mut self, callback: F) -> crate::Result<()> - where - F: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret, - { - T::pre_run(&mut **self, callback) + const NEEDS_EXTRA_LIFECYCLE_EVENTS: bool = T::NEEDS_EXTRA_LIFECYCLE_EVENTS; + + fn before_sleep(&mut self) -> crate::Result> { + T::before_sleep(&mut **self) } - fn post_run(&mut self, callback: F) -> crate::Result<()> - where - F: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret, - { - T::post_run(&mut **self, callback) + fn before_handle_events(&mut self, events: EventIterator) { + T::before_handle_events(&mut **self, events) } } @@ -266,24 +283,21 @@ impl EventSource for &mut T { T::unregister(&mut **self, poll) } - fn pre_run(&mut self, callback: F) -> crate::Result<()> - where - F: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret, - { - T::pre_run(&mut **self, callback) + const NEEDS_EXTRA_LIFECYCLE_EVENTS: bool = T::NEEDS_EXTRA_LIFECYCLE_EVENTS; + + fn before_sleep(&mut self) -> crate::Result> { + T::before_sleep(&mut **self) } - fn post_run(&mut self, callback: F) -> crate::Result<()> - where - F: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret, - { - T::post_run(&mut **self, callback) + fn before_handle_events(&mut self, events: EventIterator) { + T::before_handle_events(&mut **self, events) } } pub(crate) struct DispatcherInner { source: S, callback: F, + needs_additional_lifecycle_events: bool, } impl EventDispatcher for RefCell> @@ -301,50 +315,71 @@ where let DispatcherInner { ref mut source, ref mut callback, + .. } = *disp; source .process_events(readiness, token, |event, meta| callback(event, meta, data)) .map_err(|e| crate::Error::OtherError(e.into())) } - fn register(&self, poll: &mut Poll, token_factory: &mut TokenFactory) -> crate::Result<()> { - self.borrow_mut().source.register(poll, token_factory) + fn register( + &self, + poll: &mut Poll, + additional_lifecycle_register: &mut AdditionalLifecycleEventsSet, + token_factory: &mut TokenFactory, + ) -> crate::Result<()> { + let mut this = self.borrow_mut(); + + if this.needs_additional_lifecycle_events { + additional_lifecycle_register.register(token_factory.registration_token()); + } + this.source.register(poll, token_factory) } - fn reregister(&self, poll: &mut Poll, token_factory: &mut TokenFactory) -> crate::Result { + fn reregister( + &self, + poll: &mut Poll, + additional_lifecycle_register: &mut AdditionalLifecycleEventsSet, + token_factory: &mut TokenFactory, + ) -> crate::Result { if let Ok(mut me) = self.try_borrow_mut() { me.source.reregister(poll, token_factory)?; + if me.needs_additional_lifecycle_events { + additional_lifecycle_register.register(token_factory.registration_token()); + } Ok(true) } else { Ok(false) } } - fn unregister(&self, poll: &mut Poll) -> crate::Result { + fn unregister( + &self, + poll: &mut Poll, + additional_lifecycle_register: &mut AdditionalLifecycleEventsSet, + registration_token: RegistrationToken, + ) -> crate::Result { if let Ok(mut me) = self.try_borrow_mut() { me.source.unregister(poll)?; + if me.needs_additional_lifecycle_events { + additional_lifecycle_register.unregister(registration_token); + } Ok(true) } else { Ok(false) } } - fn pre_run(&self, data: &mut Data) -> crate::Result<()> { + fn before_sleep(&self) -> crate::Result> { let mut disp = self.borrow_mut(); - let DispatcherInner { - ref mut source, - ref mut callback, - } = *disp; - source.pre_run(|event, meta| callback(event, meta, data)) + let DispatcherInner { ref mut source, .. } = *disp; + source.before_sleep() } - fn post_run(&self, data: &mut Data) -> crate::Result<()> { + fn before_handle_events(&self, events: EventIterator<'_>) { let mut disp = self.borrow_mut(); - let DispatcherInner { - ref mut source, - ref mut callback, - } = *disp; - source.post_run(|event, meta| callback(event, meta, data)) + let DispatcherInner { ref mut source, .. } = *disp; + source.before_handle_events(events); } } @@ -356,15 +391,46 @@ pub(crate) trait EventDispatcher { data: &mut Data, ) -> crate::Result; - fn register(&self, poll: &mut Poll, token_factory: &mut TokenFactory) -> crate::Result<()>; + fn register( + &self, + poll: &mut Poll, + additional_lifecycle_register: &mut AdditionalLifecycleEventsSet, + token_factory: &mut TokenFactory, + ) -> crate::Result<()>; - fn reregister(&self, poll: &mut Poll, token_factory: &mut TokenFactory) -> crate::Result; + fn reregister( + &self, + poll: &mut Poll, + additional_lifecycle_register: &mut AdditionalLifecycleEventsSet, + token_factory: &mut TokenFactory, + ) -> crate::Result; - fn unregister(&self, poll: &mut Poll) -> crate::Result; + fn unregister( + &self, + poll: &mut Poll, + additional_lifecycle_register: &mut AdditionalLifecycleEventsSet, + registration_token: RegistrationToken, + ) -> crate::Result; + + fn before_sleep(&self) -> crate::Result>; + fn before_handle_events(&self, events: EventIterator<'_>); +} - fn pre_run(&self, data: &mut Data) -> crate::Result<()>; +#[derive(Default)] +/// The list of events +pub(crate) struct AdditionalLifecycleEventsSet { + /// The list of sources + pub(crate) values: Vec, +} - fn post_run(&self, data: &mut Data) -> crate::Result<()>; +impl AdditionalLifecycleEventsSet { + fn register(&mut self, token: RegistrationToken) { + self.values.push(token) + } + + fn unregister(&mut self, token: RegistrationToken) { + self.values.retain(|it| it != &token) + } } // An internal trait to erase the `F` type parameter of `DispatcherInner` @@ -429,7 +495,11 @@ where where F: FnMut(S::Event, &mut S::Metadata, &mut Data) -> S::Ret + 'a, { - Dispatcher(Rc::new(RefCell::new(DispatcherInner { source, callback }))) + Dispatcher(Rc::new(RefCell::new(DispatcherInner { + source, + callback, + needs_additional_lifecycle_events: S::NEEDS_EXTRA_LIFECYCLE_EVENTS, + }))) } /// Returns an immutable reference to the event source. diff --git a/src/sys.rs b/src/sys.rs index a1ba85b3..db242ae6 100644 --- a/src/sys.rs +++ b/src/sys.rs @@ -16,6 +16,7 @@ use polling::{Event, PollMode, Poller}; use crate::loop_logic::{MAX_SOURCES, MAX_SUBSOURCES_TOTAL}; use crate::sources::timer::TimerWheel; +use crate::RegistrationToken; /// Possible modes for registering a file descriptor #[derive(Copy, Clone, Debug)] @@ -137,6 +138,11 @@ impl TokenFactory { TokenFactory { key, sub_id: 0 } } + /// Get the "raw" registration token of this TokenFactory + pub(crate) fn registration_token(&self) -> RegistrationToken { + RegistrationToken::new(self.key) + } + /// Produce a new unique token pub fn token(&mut self) -> Token { // Ensure we don't overflow the sub-id.