Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose Semaphore::close #3065

Merged
merged 5 commits into from
Dec 15, 2020
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 9 additions & 5 deletions tokio/src/sync/batch_semaphore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,21 @@ struct Waitlist {
closed: bool,
}

/// Error returned by `Semaphore::try_acquire`.
/// Error returned from the `Semaphore::try_acquire` function.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This probably ought to be a link?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hawkw Correct me if I am wrong, but it seems we cannot link to non public methods. Having this type be located in batch_semaphore and linking to the method in semaphore seems a bit odd. Am I missing something?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The type is exposed in the public API. Users will only ever see it via the public Semaphore type's methods; they shouldn't need to be aware of batch_semaphore, which is an implementation detail. I think it's correct for this to link to the public methods for readers of the documentation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. Thank you !

#[derive(Debug)]
pub(crate) enum TryAcquireError {
pub enum TryAcquireError {
/// The semaphore has been closed and cannot issue new permits
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be worth linking to the Semaphore::close method here?

Suggested change
/// The semaphore has been closed and cannot issue new permits
/// The semaphore has been [closed] and cannot issue new permits.
///
/// [closed]: crate::sync::Semaphore::close

Closed,

/// The has no available permits.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The semaphore

NoPermits,
}
/// Error returned by `Semaphore::acquire`.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and this one?

///
/// An `acquire` operation can only fail if the semaphore has been
/// closed.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

might be worth linking to the close method here too?

Suggested change
/// closed.
/// [closed].
///
/// [closed]: crate::sync::Semaphore::close

#[derive(Debug)]
pub(crate) struct AcquireError(());
pub struct AcquireError(());

pub(crate) struct Acquire<'a> {
node: Waiter,
Expand Down Expand Up @@ -164,8 +170,6 @@ impl Semaphore {

/// Closes the semaphore. This prevents the semaphore from issuing new
/// permits and notifies all pending waiters.
// This will be used once the bounded MPSC is updated to use the new
// semaphore implementation.
pub(crate) fn close(&self) {
let mut waiters = self.waiters.lock();
// If the semaphore's permits counter has enough permits for an
Expand Down
4 changes: 3 additions & 1 deletion tokio/src/sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -443,8 +443,10 @@ cfg_sync! {
pub mod oneshot;

pub(crate) mod batch_semaphore;
pub use batch_semaphore::{AcquireError, TryAcquireError};

mod semaphore;
pub use semaphore::{Semaphore, SemaphorePermit, OwnedSemaphorePermit, TryAcquireError};
pub use semaphore::{Semaphore, SemaphorePermit, OwnedSemaphorePermit};

mod rwlock;
pub use rwlock::{RwLock, RwLockReadGuard, RwLockWriteGuard};
Expand Down
47 changes: 23 additions & 24 deletions tokio/src/sync/semaphore.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use super::batch_semaphore as ll; // low level implementation
use super::{AcquireError, TryAcquireError};
use std::sync::Arc;

/// Counting semaphore performing asynchronous permit acquisition.
Expand Down Expand Up @@ -42,15 +43,6 @@ pub struct OwnedSemaphorePermit {
permits: u32,
}

/// Error returned from the [`Semaphore::try_acquire`] function.
///
/// A `try_acquire` operation can only fail if the semaphore has no available
/// permits.
///
/// [`Semaphore::try_acquire`]: Semaphore::try_acquire
#[derive(Debug)]
pub struct TryAcquireError(());

#[test]
#[cfg(not(loom))]
fn bounds() {
Expand Down Expand Up @@ -96,21 +88,21 @@ impl Semaphore {
}

/// Acquires permit from the semaphore.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be nice if this comment said something about errors, like

Suggested change
/// Acquires permit from the semaphore.
/// Acquires a permit from the semaphore.
///
/// If the semaphore has been closed, this returns an [`AcquireError`].
/// Otherwise, this returns a [`SemaphorePermit`] representing the
/// acquired permit.

or something?

pub async fn acquire(&self) -> SemaphorePermit<'_> {
self.ll_sem.acquire(1).await.unwrap();
SemaphorePermit {
pub async fn acquire(&self) -> Result<SemaphorePermit<'_>, AcquireError> {
self.ll_sem.acquire(1).await?;
Ok(SemaphorePermit {
sem: &self,
permits: 1,
}
})
}

/// Acquires `n` permits from the semaphore
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and maybe this should say something like

Suggested change
/// Acquires `n` permits from the semaphore
/// Acquires `n` permits from the semaphore.
///
/// If the semaphore has been closed, this returns an [`AcquireError`].
/// Otherwise, this returns a [`SemaphorePermit`] representing the
/// acquired permits.

pub async fn acquire_many(&self, n: u32) -> SemaphorePermit<'_> {
self.ll_sem.acquire(n).await.unwrap();
SemaphorePermit {
pub async fn acquire_many(&self, n: u32) -> Result<SemaphorePermit<'_>, AcquireError> {
self.ll_sem.acquire(n).await?;
Ok(SemaphorePermit {
sem: &self,
permits: n,
}
})
}

/// Tries to acquire a permit from the semaphore.
Expand All @@ -120,7 +112,7 @@ impl Semaphore {
sem: self,
permits: 1,
}),
Err(_) => Err(TryAcquireError(())),
Err(e) => Err(e),
}
}

Expand All @@ -131,7 +123,7 @@ impl Semaphore {
sem: self,
permits: n,
}),
Err(_) => Err(TryAcquireError(())),
Err(e) => Err(e),
}
}

Expand All @@ -140,12 +132,12 @@ impl Semaphore {
/// The semaphore must be wrapped in an [`Arc`] to call this method.
///
hawkw marked this conversation as resolved.
Show resolved Hide resolved
/// [`Arc`]: std::sync::Arc
pub async fn acquire_owned(self: Arc<Self>) -> OwnedSemaphorePermit {
self.ll_sem.acquire(1).await.unwrap();
OwnedSemaphorePermit {
pub async fn acquire_owned(self: Arc<Self>) -> Result<OwnedSemaphorePermit, AcquireError> {
self.ll_sem.acquire(1).await?;
Ok(OwnedSemaphorePermit {
sem: self,
permits: 1,
}
})
}

/// Tries to acquire a permit from the semaphore.
Expand All @@ -159,9 +151,16 @@ impl Semaphore {
sem: self,
permits: 1,
}),
Err(_) => Err(TryAcquireError(())),
Err(e) => Err(e),
}
}

/// Closes the semaphore.
///
/// This prevents the semaphore from issuing new permits and notifies all pending waiters.
hawkw marked this conversation as resolved.
Show resolved Hide resolved
pub fn close(&self) {
carllerche marked this conversation as resolved.
Show resolved Hide resolved
self.ll_sem.close();
}
}

impl<'a> SemaphorePermit<'a> {
Expand Down