Skip to content

Commit

Permalink
sync: fix Mutex when lock future dropped before complete (#1902)
Browse files Browse the repository at this point in the history
The bug caused the mutex to reach a state where it is locked and cannot be unlocked.

Fixes #1898
  • Loading branch information
bikeshedder authored and carllerche committed Dec 6, 2019
1 parent a53f94a commit c632337
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 12 deletions.
19 changes: 7 additions & 12 deletions tokio/src/sync/mutex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,29 +86,24 @@ impl<T> Mutex<T> {

/// A future that resolves on acquiring the lock and returns the `MutexGuard`.
pub async fn lock(&self) -> MutexGuard<'_, T> {
let mut permit = semaphore::Permit::new();
poll_fn(|cx| permit.poll_acquire(cx, &self.s))
let mut guard = MutexGuard {
lock: self,
permit: semaphore::Permit::new(),
};
poll_fn(|cx| guard.permit.poll_acquire(cx, &self.s))
.await
.unwrap_or_else(|_| {
// The semaphore was closed. but, we never explicitly close it, and we have a
// handle to it through the Arc, which means that this can never happen.
unreachable!()
});

MutexGuard { lock: self, permit }
guard
}
}

impl<'a, T> Drop for MutexGuard<'a, T> {
fn drop(&mut self) {
if self.permit.is_acquired() {
self.permit.release(&self.lock.s);
} else if ::std::thread::panicking() {
// A guard _should_ always hold its permit, but if the thread is already panicking,
// we don't want to generate a panic-while-panicing, since that's just unhelpful!
} else {
unreachable!("Permit not held when MutexGuard was dropped")
}
self.permit.release(&self.lock.s);
}
}

Expand Down
55 changes: 55 additions & 0 deletions tokio/tests/sync_mutex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@
#![cfg(feature = "full")]

use tokio::sync::Mutex;
use tokio::time::{interval, timeout};
use tokio_test::task::spawn;
use tokio_test::{assert_pending, assert_ready};

use std::sync::Arc;
use std::time::Duration;

#[test]
fn straight_execution() {
Expand Down Expand Up @@ -79,3 +81,56 @@ fn lock() {
assert!(*result);
}
*/

#[tokio::main]
#[test]
/// Ensure a mutex is unlocked if a future holding the lock
/// is aborted prematurely.
async fn aborted_future_1() {
let m1: Arc<Mutex<usize>> = Arc::new(Mutex::new(0));
{
let m2 = m1.clone();
// Try to lock mutex in a future that is aborted prematurely
timeout(Duration::from_millis(1u64), async move {
let mut iv = interval(Duration::from_millis(1000));
m2.lock().await;
iv.tick().await;
iv.tick().await;
})
.await
.unwrap_err();
}
// This should succeed as there is no lock left for the mutex.
timeout(Duration::from_millis(1u64), async move {
m1.lock().await;
})
.await
.expect("Mutex is locked");
}

#[tokio::main]
#[test]
/// This test is similar to `aborted_future_1` but this time the
/// aborted future is waiting for the lock.
async fn aborted_future_2() {
let m1: Arc<Mutex<usize>> = Arc::new(Mutex::new(0));
{
// Lock mutex
let _lock = m1.lock().await;
{
let m2 = m1.clone();
// Try to lock mutex in a future that is aborted prematurely
timeout(Duration::from_millis(1u64), async move {
m2.lock().await;
})
.await
.unwrap_err();
}
}
// This should succeed as there is no lock left for the mutex.
timeout(Duration::from_millis(1u64), async move {
m1.lock().await;
})
.await
.expect("Mutex is locked");
}

0 comments on commit c632337

Please sign in to comment.