Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix issue #1898 - Permit::release() must be called when Mutex::lock() is aborted even if it is in the Waiting state #1902

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 7 additions & 12 deletions tokio/src/sync/mutex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,29 +86,24 @@ impl<T> Mutex<T> {

/// A future that resolves on acquiring the lock and returns the `MutexGuard`.
pub async fn lock(&self) -> MutexGuard<'_, T> {
let mut permit = semaphore::Permit::new();
poll_fn(|cx| permit.poll_acquire(cx, &self.s))
let mut guard = MutexGuard {
lock: self,
permit: semaphore::Permit::new(),
};
poll_fn(|cx| guard.permit.poll_acquire(cx, &self.s))
.await
.unwrap_or_else(|_| {
// The semaphore was closed. but, we never explicitly close it, and we have a
// handle to it through the Arc, which means that this can never happen.
unreachable!()
});

MutexGuard { lock: self, permit }
guard
}
}

impl<'a, T> Drop for MutexGuard<'a, T> {
fn drop(&mut self) {
if self.permit.is_acquired() {
self.permit.release(&self.lock.s);
} else if ::std::thread::panicking() {
// A guard _should_ always hold its permit, but if the thread is already panicking,
// we don't want to generate a panic-while-panicing, since that's just unhelpful!
} else {
unreachable!("Permit not held when MutexGuard was dropped")
}
self.permit.release(&self.lock.s);
hawkw marked this conversation as resolved.
Show resolved Hide resolved
}
}

Expand Down
55 changes: 55 additions & 0 deletions tokio/tests/sync_mutex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@
#![cfg(feature = "full")]

use tokio::sync::Mutex;
use tokio::time::{interval, timeout};
use tokio_test::task::spawn;
use tokio_test::{assert_pending, assert_ready};

use std::sync::Arc;
use std::time::Duration;

#[test]
fn straight_execution() {
Expand Down Expand Up @@ -79,3 +81,56 @@ fn lock() {
assert!(*result);
}
*/

#[tokio::main]
#[test]
/// Ensure a mutex is unlocked if a future holding the lock
/// is aborted prematurely.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: It would be good to reference the GitHub issue that these tests reproduce.

async fn aborted_future_1() {
let m1: Arc<Mutex<usize>> = Arc::new(Mutex::new(0));
{
let m2 = m1.clone();
// Try to lock mutex in a future that is aborted prematurely
timeout(Duration::from_millis(1u64), async move {
let mut iv = interval(Duration::from_millis(1000));
m2.lock().await;
iv.tick().await;
iv.tick().await;
Comment on lines +97 to +98
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The use of the interval here is a little un-obvious, it could be good if there was a comment explaining what it's doing here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought that the comment Try to lock mutex in a future that is aborted prematurely was telling enough. tbh. That test case is not really needed. I just added it when debugging the code and trying to reproduce the locking behaviour. This test just makes sure Aquired locks are returned when their future is aborted.

Is there a better way to let a future delay its execution than creating an interval and waiting twice? tick().await is called twice as the first call of it returns instantly.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a better way to let a future delay its execution than creating an interval and waiting twice? tick().await is called twice as the first call of it returns instantly.

You could possibly just use

task::yield_now().await;

if all you want is to yield to the scheduler so that another task will be polled? But, what you're doing now does make sense, I just thought it would be helpful if there was a comment explaining what it was for.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm going to move forward merging & shipping. We can tweak the test in a follow up PR.

})
.await
.unwrap_err();
}
// This should succeed as there is no lock left for the mutex.
timeout(Duration::from_millis(1u64), async move {
m1.lock().await;
})
.await
.expect("Mutex is locked");
}

#[tokio::main]
#[test]
/// This test is similar to `aborted_future_1` but this time the
/// aborted future is waiting for the lock.
async fn aborted_future_2() {
let m1: Arc<Mutex<usize>> = Arc::new(Mutex::new(0));
{
// Lock mutex
let _lock = m1.lock().await;
{
let m2 = m1.clone();
// Try to lock mutex in a future that is aborted prematurely
timeout(Duration::from_millis(1u64), async move {
m2.lock().await;
})
.await
.unwrap_err();
}
}
// This should succeed as there is no lock left for the mutex.
timeout(Duration::from_millis(1u64), async move {
m1.lock().await;
})
.await
.expect("Mutex is locked");
}