Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mpsc: Add Sender::try_reserve function #3418

Merged
merged 3 commits into from
Jan 13, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 52 additions & 0 deletions tokio/src/sync/mpsc/bounded.rs
Original file line number Diff line number Diff line change
Expand Up @@ -603,6 +603,58 @@ impl<T> Sender<T> {

Ok(Permit { chan: &self.chan })
}

/// Try to acquire a slot in the channel without waiting for the slot to become
/// available.
///
/// If the channel is full this function will return [`TrySendError`], otherwise
/// if there is a slot available it will return a [`Permit`] that will then allow you
/// to [`send`] on the channel with a guaranteed slot. This function is similar to
/// [`reserve`] execpt it does not await for the slot to become available.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: :s/execpt/except/g

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PRS WELCOME RORY!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If someone's going to address this typo, consider also mentioning the new try_reserve in Permit's rustdoc (currently it only mentions reserve as returning it). I can submit that PR if nobody else is on it 😄

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Go ahead.

///
/// Dropping [`Permit`] without sending a message releases the capacity back
/// to the channel.
///
/// [`Permit`]: Permit
/// [`send`]: Permit::send
/// [`reserve`]: Sender::reserve
///
/// # Examples
///
/// ```
/// use tokio::sync::mpsc;
///
/// #[tokio::main]
/// async fn main() {
/// let (tx, mut rx) = mpsc::channel(1);
///
/// // Reserve capacity
/// let permit = tx.try_reserve().unwrap();
///
/// // Trying to send directly on the `tx` will fail due to no
/// // available capacity.
/// assert!(tx.try_send(123).is_err());
///
/// // Trying to reserve an additional slot on the `tx` will
/// // fail because there is no capacity.
/// assert!(tx.try_reserve().is_err());
///
/// // Sending on the permit succeeds
/// permit.send(456);
///
/// // The value sent on the permit is received
/// assert_eq!(rx.recv().await.unwrap(), 456);
///
/// }
/// ```
pub fn try_reserve(&self) -> Result<Permit<'_, T>, TrySendError<()>> {
match self.chan.semaphore().0.try_acquire(1) {
Ok(_) => {}
Err(_) => return Err(TrySendError::Full(())),
}

Ok(Permit { chan: &self.chan })
}
}

impl<T> Clone for Sender<T> {
Expand Down
23 changes: 23 additions & 0 deletions tokio/tests/sync_mpsc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,29 @@ async fn try_send_fail() {
assert!(rx.recv().await.is_none());
}

#[tokio::test]
async fn try_reserve_fails() {
let (tx, mut rx) = mpsc::channel(1);

let permit = tx.try_reserve().unwrap();

// This should fail
match assert_err!(tx.try_reserve()) {
TrySendError::Full(()) => {}
_ => panic!(),
}

permit.send("foo");

assert_eq!(rx.recv().await, Some("foo"));

// Dropping permit releases the slot.
let permit = tx.try_reserve().unwrap();
drop(permit);

let _permit = tx.try_reserve().unwrap();
}

#[tokio::test]
async fn drop_permit_releases_permit() {
// poll_ready reserves capacity, ensure that the capacity is released if tx
Expand Down