Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement blocking_recv function for broadcast::Receiver #5662

Closed
Slixe opened this issue Apr 29, 2023 · 3 comments · Fixed by #5690
Closed

Implement blocking_recv function for broadcast::Receiver #5662

Slixe opened this issue Apr 29, 2023 · 3 comments · Fixed by #5690
Labels
A-tokio Area: The main tokio crate C-feature-request Category: A feature request. M-sync Module: tokio/sync

Comments

@Slixe
Copy link

Slixe commented Apr 29, 2023

Is your feature request related to a problem? Please describe.
I would like to be able to block on the Receiver even in an blocking context, not only in async context.

Describe the solution you'd like
Add blocking_recv() function in broadcast::Receiver like its done for others types of Receiver channels.

Describe alternatives you've considered
At this moment, I'm using a try_recv() and a sleep on the Err branch in a loop. A block_on may also be used.

Additional context
I'm creating std threads for faster job I don't want to setup async context for each just for this.

@Slixe Slixe added A-tokio Area: The main tokio crate C-feature-request Category: A feature request. labels Apr 29, 2023
@Darksonn Darksonn added the M-sync Module: tokio/sync label Apr 29, 2023
@debadree25
Copy link
Contributor

Hey i would want to give this a try! @Darksonn could you guide on what parts of the code would be relevant for this?

@Darksonn
Copy link
Contributor

Darksonn commented May 3, 2023

Sure. You will need to make your changes in tokio/src/sync/broadcast.rs. You can use the following method from the mpsc channel for inspiration:

/// Blocking receive to call outside of asynchronous contexts.
///
/// This method returns `None` if the channel has been closed and there are
/// no remaining messages in the channel's buffer. This indicates that no
/// further values can ever be received from this `Receiver`. The channel is
/// closed when all senders have been dropped, or when [`close`] is called.
///
/// If there are no messages in the channel's buffer, but the channel has
/// not yet been closed, this method will block until a message is sent or
/// the channel is closed.
///
/// This method is intended for use cases where you are sending from
/// asynchronous code to synchronous code, and will work even if the sender
/// is not using [`blocking_send`] to send the message.
///
/// Note that if [`close`] is called, but there are still outstanding
/// [`Permits`] from before it was closed, the channel is not considered
/// closed by `blocking_recv` until the permits are released.
///
/// [`close`]: Self::close
/// [`Permits`]: struct@crate::sync::mpsc::Permit
/// [`blocking_send`]: fn@crate::sync::mpsc::Sender::blocking_send
///
/// # Panics
///
/// This function panics if called within an asynchronous execution
/// context.
///
/// # Examples
///
/// ```
/// use std::thread;
/// use tokio::runtime::Runtime;
/// use tokio::sync::mpsc;
///
/// fn main() {
/// let (tx, mut rx) = mpsc::channel::<u8>(10);
///
/// let sync_code = thread::spawn(move || {
/// assert_eq!(Some(10), rx.blocking_recv());
/// });
///
/// Runtime::new()
/// .unwrap()
/// .block_on(async move {
/// let _ = tx.send(10).await;
/// });
/// sync_code.join().unwrap()
/// }
/// ```
#[track_caller]
#[cfg(feature = "sync")]
#[cfg_attr(docsrs, doc(alias = "recv_blocking"))]
pub fn blocking_recv(&mut self) -> Option<T> {
crate::future::block_on(self.recv())
}

@debadree25
Copy link
Contributor

Ah thank you very much! will try to open a PR soon!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
A-tokio Area: The main tokio crate C-feature-request Category: A feature request. M-sync Module: tokio/sync
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants