From fc3791fe31626f03034c5627de62ff5a77628fbb Mon Sep 17 00:00:00 2001 From: "M.Amin Rayej" Date: Sat, 16 Mar 2024 23:49:02 +0330 Subject: [PATCH 1/3] expose strong and weak count for mpsc senders --- tokio/src/sync/mpsc/bounded.rs | 26 +++++++ tokio/src/sync/mpsc/chan.rs | 26 +++++++ tokio/src/sync/mpsc/unbounded.rs | 26 +++++++ tokio/tests/sync_mpsc_weak.rs | 129 ++++++++++++++++++++++++++++++- 4 files changed, 206 insertions(+), 1 deletion(-) diff --git a/tokio/src/sync/mpsc/bounded.rs b/tokio/src/sync/mpsc/bounded.rs index 3cdba3dc237..a5c6cb4523b 100644 --- a/tokio/src/sync/mpsc/bounded.rs +++ b/tokio/src/sync/mpsc/bounded.rs @@ -1409,6 +1409,16 @@ impl Sender { pub fn max_capacity(&self) -> usize { self.chan.semaphore().bound } + + /// Returns the number of [`Sender`] handles. + pub fn strong_count(&self) -> usize { + self.chan.strong_count() + } + + /// Returns the number of [`WeakSender`] handles. + pub fn weak_count(&self) -> usize { + self.chan.weak_count() + } } impl Clone for Sender { @@ -1435,6 +1445,12 @@ impl Clone for WeakSender { } } +impl Drop for WeakSender { + fn drop(&mut self) { + self.chan.decrement_weak_count(); + } +} + impl WeakSender { /// Tries to convert a `WeakSender` into a [`Sender`]. This will return `Some` /// if there are other `Sender` instances alive and the channel wasn't @@ -1442,6 +1458,16 @@ impl WeakSender { pub fn upgrade(&self) -> Option> { chan::Tx::upgrade(self.chan.clone()).map(Sender::new) } + + /// Returns the number of [`Sender`] handles. + pub fn strong_count(&self) -> usize { + self.chan.strong_count() + } + + /// Returns the number of [`WeakSender`] handles. + pub fn weak_count(&self) -> usize { + self.chan.weak_count() + } } impl fmt::Debug for WeakSender { diff --git a/tokio/src/sync/mpsc/chan.rs b/tokio/src/sync/mpsc/chan.rs index c05a4abb7c0..dfe4d2dceec 100644 --- a/tokio/src/sync/mpsc/chan.rs +++ b/tokio/src/sync/mpsc/chan.rs @@ -66,6 +66,9 @@ pub(super) struct Chan { /// When this drops to zero, the send half of the channel is closed. tx_count: AtomicUsize, + /// Tracks the number of outstanding weak sender handles. + tx_weak_count: AtomicUsize, + /// Only accessed by `Rx` handle. rx_fields: UnsafeCell>, } @@ -115,6 +118,7 @@ pub(crate) fn channel(semaphore: S) -> (Tx, Rx) { semaphore, rx_waker: CachePadded::new(AtomicWaker::new()), tx_count: AtomicUsize::new(1), + tx_weak_count: AtomicUsize::new(0), rx_fields: UnsafeCell::new(RxFields { list: rx, rx_closed: false, @@ -131,7 +135,17 @@ impl Tx { Tx { inner: chan } } + pub(super) fn strong_count(&self) -> usize { + self.inner.tx_count.load(Acquire) + } + + pub(super) fn weak_count(&self) -> usize { + self.inner.tx_weak_count.load(Relaxed) + } + pub(super) fn downgrade(&self) -> Arc> { + self.inner.tx_weak_count.fetch_add(1, Relaxed); + self.inner.clone() } @@ -452,6 +466,18 @@ impl Chan { // Notify the rx task self.rx_waker.wake(); } + + pub(super) fn decrement_weak_count(&self) { + self.tx_weak_count.fetch_sub(1, Relaxed); + } + + pub(super) fn strong_count(&self) -> usize { + self.tx_count.load(Acquire) + } + + pub(super) fn weak_count(&self) -> usize { + self.tx_weak_count.load(Relaxed) + } } impl Drop for Chan { diff --git a/tokio/src/sync/mpsc/unbounded.rs b/tokio/src/sync/mpsc/unbounded.rs index b87b07ba653..6be29014894 100644 --- a/tokio/src/sync/mpsc/unbounded.rs +++ b/tokio/src/sync/mpsc/unbounded.rs @@ -578,6 +578,16 @@ impl UnboundedSender { chan: self.chan.downgrade(), } } + + /// Returns the number of [`UnboundedSender`] handles. + pub fn strong_count(&self) -> usize { + self.chan.strong_count() + } + + /// Returns the number of [`WeakUnboundedSender`] handles. + pub fn weak_count(&self) -> usize { + self.chan.weak_count() + } } impl Clone for WeakUnboundedSender { @@ -588,6 +598,12 @@ impl Clone for WeakUnboundedSender { } } +impl Drop for WeakUnboundedSender { + fn drop(&mut self) { + self.chan.decrement_weak_count(); + } +} + impl WeakUnboundedSender { /// Tries to convert a `WeakUnboundedSender` into an [`UnboundedSender`]. /// This will return `Some` if there are other `Sender` instances alive and @@ -595,6 +611,16 @@ impl WeakUnboundedSender { pub fn upgrade(&self) -> Option> { chan::Tx::upgrade(self.chan.clone()).map(UnboundedSender::new) } + + /// Returns the number of [`UnboundedSender`] handles. + pub fn strong_count(&self) -> usize { + self.chan.strong_count() + } + + /// Returns the number of [`WeakUnboundedSender`] handles. + pub fn weak_count(&self) -> usize { + self.chan.weak_count() + } } impl fmt::Debug for WeakUnboundedSender { diff --git a/tokio/tests/sync_mpsc_weak.rs b/tokio/tests/sync_mpsc_weak.rs index fad4c72f799..ac49443c6c2 100644 --- a/tokio/tests/sync_mpsc_weak.rs +++ b/tokio/tests/sync_mpsc_weak.rs @@ -7,8 +7,9 @@ use wasm_bindgen_test::wasm_bindgen_test as test; use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering::{Acquire, Release}; +use std::sync::Arc; use tokio::sync::mpsc::{self, channel, unbounded_channel}; -use tokio::sync::oneshot; +use tokio::sync::{oneshot, Notify}; #[tokio::test] async fn weak_sender() { @@ -511,3 +512,129 @@ fn test_tx_count_weak_unbounded_sender() { assert!(tx_weak.upgrade().is_none() && tx_weak2.upgrade().is_none()); } + +#[tokio::test] +async fn strong_and_weak_count() { + let (tx, _rx) = mpsc::channel::<()>(1); + + let first_downgrade = Arc::new(Notify::new()); + let second_downgrade = Arc::new(Notify::new()); + + let task_handle = { + let tx = tx.clone(); + let first_downgrade = first_downgrade.clone(); + let second_downgrade = second_downgrade.clone(); + + tokio::spawn(async move { + let weak = tx.downgrade(); + + assert_eq!(tx.weak_count(), 1); + assert_eq!(tx.strong_count(), 2); + assert_eq!(weak.weak_count(), 1); + assert_eq!(weak.strong_count(), 2); + + first_downgrade.notify_one(); + + second_downgrade.notified().await; + + assert_eq!(tx.weak_count(), 2); + assert_eq!(tx.strong_count(), 2); + assert_eq!(weak.weak_count(), 2); + assert_eq!(weak.strong_count(), 2); + + drop(weak); + + assert_eq!(tx.weak_count(), 1); + assert_eq!(tx.strong_count(), 2); + + drop(tx); + }) + }; + + first_downgrade.notified().await; + + assert_eq!(tx.weak_count(), 1); + assert_eq!(tx.strong_count(), 2); + + let weak = tx.downgrade(); + + assert_eq!(tx.weak_count(), 2); + assert_eq!(tx.strong_count(), 2); + assert_eq!(weak.weak_count(), 2); + assert_eq!(weak.strong_count(), 2); + + second_downgrade.notify_one(); + + task_handle.await.unwrap(); + + assert_eq!(tx.weak_count(), 1); + assert_eq!(tx.strong_count(), 1); + + drop(weak); + + assert_eq!(tx.weak_count(), 0); + assert_eq!(tx.strong_count(), 1); +} + +#[tokio::test] +async fn unbounded_strong_and_weak_count() { + let (tx, _rx) = mpsc::unbounded_channel::<()>(); + + let first_downgrade = Arc::new(Notify::new()); + let second_downgrade = Arc::new(Notify::new()); + + let task_handle = { + let tx = tx.clone(); + let first_downgrade = first_downgrade.clone(); + let second_downgrade = second_downgrade.clone(); + + tokio::spawn(async move { + let weak = tx.downgrade(); + + assert_eq!(tx.weak_count(), 1); + assert_eq!(tx.strong_count(), 2); + assert_eq!(weak.weak_count(), 1); + assert_eq!(weak.strong_count(), 2); + + first_downgrade.notify_one(); + + second_downgrade.notified().await; + + assert_eq!(tx.weak_count(), 2); + assert_eq!(tx.strong_count(), 2); + assert_eq!(weak.weak_count(), 2); + assert_eq!(weak.strong_count(), 2); + + drop(weak); + + assert_eq!(tx.weak_count(), 1); + assert_eq!(tx.strong_count(), 2); + + drop(tx); + }) + }; + + first_downgrade.notified().await; + + assert_eq!(tx.weak_count(), 1); + assert_eq!(tx.strong_count(), 2); + + let weak = tx.downgrade(); + + assert_eq!(tx.weak_count(), 2); + assert_eq!(tx.strong_count(), 2); + assert_eq!(weak.weak_count(), 2); + assert_eq!(weak.strong_count(), 2); + + second_downgrade.notify_one(); + + task_handle.await.unwrap(); + + assert_eq!(tx.weak_count(), 1); + assert_eq!(tx.strong_count(), 1); + + drop(weak); + + assert_eq!(tx.weak_count(), 0); + assert_eq!(tx.strong_count(), 1); +} From afeb31c221188261a789badb41db0584f8999b8d Mon Sep 17 00:00:00 2001 From: "M.Amin Rayej" Date: Sun, 17 Mar 2024 00:36:46 +0330 Subject: [PATCH 2/3] increment weak count when cloning a weak handle --- tokio/src/sync/mpsc/bounded.rs | 2 ++ tokio/src/sync/mpsc/chan.rs | 6 +++- tokio/src/sync/mpsc/unbounded.rs | 2 ++ tokio/tests/sync_mpsc_weak.rs | 52 ++++++++++++++++++++++++++------ 4 files changed, 51 insertions(+), 11 deletions(-) diff --git a/tokio/src/sync/mpsc/bounded.rs b/tokio/src/sync/mpsc/bounded.rs index a5c6cb4523b..b7b1ce7f623 100644 --- a/tokio/src/sync/mpsc/bounded.rs +++ b/tokio/src/sync/mpsc/bounded.rs @@ -1439,6 +1439,8 @@ impl fmt::Debug for Sender { impl Clone for WeakSender { fn clone(&self) -> Self { + self.chan.increment_weak_count(); + WeakSender { chan: self.chan.clone(), } diff --git a/tokio/src/sync/mpsc/chan.rs b/tokio/src/sync/mpsc/chan.rs index dfe4d2dceec..179a69f5700 100644 --- a/tokio/src/sync/mpsc/chan.rs +++ b/tokio/src/sync/mpsc/chan.rs @@ -144,7 +144,7 @@ impl Tx { } pub(super) fn downgrade(&self) -> Arc> { - self.inner.tx_weak_count.fetch_add(1, Relaxed); + self.inner.increment_weak_count(); self.inner.clone() } @@ -471,6 +471,10 @@ impl Chan { self.tx_weak_count.fetch_sub(1, Relaxed); } + pub(super) fn increment_weak_count(&self) { + self.tx_weak_count.fetch_add(1, Relaxed); + } + pub(super) fn strong_count(&self) -> usize { self.tx_count.load(Acquire) } diff --git a/tokio/src/sync/mpsc/unbounded.rs b/tokio/src/sync/mpsc/unbounded.rs index 6be29014894..e5ef0adef38 100644 --- a/tokio/src/sync/mpsc/unbounded.rs +++ b/tokio/src/sync/mpsc/unbounded.rs @@ -592,6 +592,8 @@ impl UnboundedSender { impl Clone for WeakUnboundedSender { fn clone(&self) -> Self { + self.chan.increment_weak_count(); + WeakUnboundedSender { chan: self.chan.clone(), } diff --git a/tokio/tests/sync_mpsc_weak.rs b/tokio/tests/sync_mpsc_weak.rs index ac49443c6c2..914e3bea86c 100644 --- a/tokio/tests/sync_mpsc_weak.rs +++ b/tokio/tests/sync_mpsc_weak.rs @@ -533,17 +533,33 @@ async fn strong_and_weak_count() { assert_eq!(weak.weak_count(), 1); assert_eq!(weak.strong_count(), 2); + let weak2 = weak.clone(); + + assert_eq!(tx.weak_count(), 2); + assert_eq!(tx.strong_count(), 2); + assert_eq!(weak.weak_count(), 2); + assert_eq!(weak.strong_count(), 2); + assert_eq!(weak2.weak_count(), 2); + assert_eq!(weak2.strong_count(), 2); + first_downgrade.notify_one(); second_downgrade.notified().await; - assert_eq!(tx.weak_count(), 2); + assert_eq!(tx.weak_count(), 3); assert_eq!(tx.strong_count(), 2); - assert_eq!(weak.weak_count(), 2); + assert_eq!(weak.weak_count(), 3); assert_eq!(weak.strong_count(), 2); + assert_eq!(weak2.weak_count(), 3); + assert_eq!(weak2.strong_count(), 2); drop(weak); + assert_eq!(tx.weak_count(), 2); + assert_eq!(tx.strong_count(), 2); + assert_eq!(weak2.weak_count(), 2); + assert_eq!(weak2.strong_count(), 2); + drop(weak2); assert_eq!(tx.weak_count(), 1); assert_eq!(tx.strong_count(), 2); @@ -553,14 +569,14 @@ async fn strong_and_weak_count() { first_downgrade.notified().await; - assert_eq!(tx.weak_count(), 1); + assert_eq!(tx.weak_count(), 2); assert_eq!(tx.strong_count(), 2); let weak = tx.downgrade(); - assert_eq!(tx.weak_count(), 2); + assert_eq!(tx.weak_count(), 3); assert_eq!(tx.strong_count(), 2); - assert_eq!(weak.weak_count(), 2); + assert_eq!(weak.weak_count(), 3); assert_eq!(weak.strong_count(), 2); second_downgrade.notify_one(); @@ -596,17 +612,33 @@ async fn unbounded_strong_and_weak_count() { assert_eq!(weak.weak_count(), 1); assert_eq!(weak.strong_count(), 2); + let weak2 = weak.clone(); + + assert_eq!(tx.weak_count(), 2); + assert_eq!(tx.strong_count(), 2); + assert_eq!(weak.weak_count(), 2); + assert_eq!(weak.strong_count(), 2); + assert_eq!(weak2.weak_count(), 2); + assert_eq!(weak2.strong_count(), 2); + first_downgrade.notify_one(); second_downgrade.notified().await; - assert_eq!(tx.weak_count(), 2); + assert_eq!(tx.weak_count(), 3); assert_eq!(tx.strong_count(), 2); - assert_eq!(weak.weak_count(), 2); + assert_eq!(weak.weak_count(), 3); assert_eq!(weak.strong_count(), 2); + assert_eq!(weak2.weak_count(), 3); + assert_eq!(weak2.strong_count(), 2); drop(weak); + assert_eq!(tx.weak_count(), 2); + assert_eq!(tx.strong_count(), 2); + assert_eq!(weak2.weak_count(), 2); + assert_eq!(weak2.strong_count(), 2); + drop(weak2); assert_eq!(tx.weak_count(), 1); assert_eq!(tx.strong_count(), 2); @@ -616,14 +648,14 @@ async fn unbounded_strong_and_weak_count() { first_downgrade.notified().await; - assert_eq!(tx.weak_count(), 1); + assert_eq!(tx.weak_count(), 2); assert_eq!(tx.strong_count(), 2); let weak = tx.downgrade(); - assert_eq!(tx.weak_count(), 2); + assert_eq!(tx.weak_count(), 3); assert_eq!(tx.strong_count(), 2); - assert_eq!(weak.weak_count(), 2); + assert_eq!(weak.weak_count(), 3); assert_eq!(weak.strong_count(), 2); second_downgrade.notify_one(); From 2ccad6accf6f92ee3ad121885075979f17d90083 Mon Sep 17 00:00:00 2001 From: "M.Amin Rayej" Date: Fri, 22 Mar 2024 22:57:49 +0330 Subject: [PATCH 3/3] split big tests into smaller tests --- tokio/tests/sync_mpsc_weak.rs | 191 ++++++++++++++++------------------ 1 file changed, 87 insertions(+), 104 deletions(-) diff --git a/tokio/tests/sync_mpsc_weak.rs b/tokio/tests/sync_mpsc_weak.rs index 914e3bea86c..7716902f959 100644 --- a/tokio/tests/sync_mpsc_weak.rs +++ b/tokio/tests/sync_mpsc_weak.rs @@ -7,9 +7,8 @@ use wasm_bindgen_test::wasm_bindgen_test as test; use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering::{Acquire, Release}; -use std::sync::Arc; use tokio::sync::mpsc::{self, channel, unbounded_channel}; -use tokio::sync::{oneshot, Notify}; +use tokio::sync::oneshot; #[tokio::test] async fn weak_sender() { @@ -514,159 +513,143 @@ fn test_tx_count_weak_unbounded_sender() { } #[tokio::test] -async fn strong_and_weak_count() { +async fn sender_strong_count_when_cloned() { let (tx, _rx) = mpsc::channel::<()>(1); - let first_downgrade = Arc::new(Notify::new()); - let second_downgrade = Arc::new(Notify::new()); + let tx2 = tx.clone(); - let task_handle = { - let tx = tx.clone(); - let first_downgrade = first_downgrade.clone(); - let second_downgrade = second_downgrade.clone(); + assert_eq!(tx.strong_count(), 2); + assert_eq!(tx2.strong_count(), 2); +} - tokio::spawn(async move { - let weak = tx.downgrade(); +#[tokio::test] +async fn sender_weak_count_when_downgraded() { + let (tx, _rx) = mpsc::channel::<()>(1); - assert_eq!(tx.weak_count(), 1); - assert_eq!(tx.strong_count(), 2); - assert_eq!(weak.weak_count(), 1); - assert_eq!(weak.strong_count(), 2); + let weak = tx.downgrade(); - let weak2 = weak.clone(); + assert_eq!(tx.weak_count(), 1); + assert_eq!(weak.weak_count(), 1); +} - assert_eq!(tx.weak_count(), 2); - assert_eq!(tx.strong_count(), 2); - assert_eq!(weak.weak_count(), 2); - assert_eq!(weak.strong_count(), 2); - assert_eq!(weak2.weak_count(), 2); - assert_eq!(weak2.strong_count(), 2); +#[tokio::test] +async fn sender_strong_count_when_dropped() { + let (tx, _rx) = mpsc::channel::<()>(1); + + let tx2 = tx.clone(); - first_downgrade.notify_one(); + drop(tx2); - second_downgrade.notified().await; + assert_eq!(tx.strong_count(), 1); +} - assert_eq!(tx.weak_count(), 3); - assert_eq!(tx.strong_count(), 2); - assert_eq!(weak.weak_count(), 3); - assert_eq!(weak.strong_count(), 2); - assert_eq!(weak2.weak_count(), 3); - assert_eq!(weak2.strong_count(), 2); +#[tokio::test] +async fn sender_weak_count_when_dropped() { + let (tx, _rx) = mpsc::channel::<()>(1); - drop(weak); - assert_eq!(tx.weak_count(), 2); - assert_eq!(tx.strong_count(), 2); - assert_eq!(weak2.weak_count(), 2); - assert_eq!(weak2.strong_count(), 2); + let weak = tx.downgrade(); - drop(weak2); - assert_eq!(tx.weak_count(), 1); - assert_eq!(tx.strong_count(), 2); + drop(weak); - drop(tx); - }) - }; + assert_eq!(tx.weak_count(), 0); +} - first_downgrade.notified().await; +#[tokio::test] +async fn sender_strong_and_weak_conut() { + let (tx, _rx) = mpsc::channel::<()>(1); - assert_eq!(tx.weak_count(), 2); - assert_eq!(tx.strong_count(), 2); + let tx2 = tx.clone(); let weak = tx.downgrade(); + let weak2 = tx2.downgrade(); - assert_eq!(tx.weak_count(), 3); assert_eq!(tx.strong_count(), 2); - assert_eq!(weak.weak_count(), 3); + assert_eq!(tx2.strong_count(), 2); assert_eq!(weak.strong_count(), 2); + assert_eq!(weak2.strong_count(), 2); - second_downgrade.notify_one(); + assert_eq!(tx.weak_count(), 2); + assert_eq!(tx2.weak_count(), 2); + assert_eq!(weak.weak_count(), 2); + assert_eq!(weak2.weak_count(), 2); - task_handle.await.unwrap(); + drop(tx2); + drop(weak2); - assert_eq!(tx.weak_count(), 1); assert_eq!(tx.strong_count(), 1); + assert_eq!(weak.strong_count(), 1); - drop(weak); - - assert_eq!(tx.weak_count(), 0); - assert_eq!(tx.strong_count(), 1); + assert_eq!(tx.weak_count(), 1); + assert_eq!(weak.weak_count(), 1); } #[tokio::test] -async fn unbounded_strong_and_weak_count() { +async fn unbounded_sender_strong_count_when_cloned() { let (tx, _rx) = mpsc::unbounded_channel::<()>(); - let first_downgrade = Arc::new(Notify::new()); - let second_downgrade = Arc::new(Notify::new()); + let tx2 = tx.clone(); - let task_handle = { - let tx = tx.clone(); - let first_downgrade = first_downgrade.clone(); - let second_downgrade = second_downgrade.clone(); + assert_eq!(tx.strong_count(), 2); + assert_eq!(tx2.strong_count(), 2); +} - tokio::spawn(async move { - let weak = tx.downgrade(); +#[tokio::test] +async fn unbounded_sender_weak_count_when_downgraded() { + let (tx, _rx) = mpsc::unbounded_channel::<()>(); - assert_eq!(tx.weak_count(), 1); - assert_eq!(tx.strong_count(), 2); - assert_eq!(weak.weak_count(), 1); - assert_eq!(weak.strong_count(), 2); + let weak = tx.downgrade(); - let weak2 = weak.clone(); + assert_eq!(tx.weak_count(), 1); + assert_eq!(weak.weak_count(), 1); +} - assert_eq!(tx.weak_count(), 2); - assert_eq!(tx.strong_count(), 2); - assert_eq!(weak.weak_count(), 2); - assert_eq!(weak.strong_count(), 2); - assert_eq!(weak2.weak_count(), 2); - assert_eq!(weak2.strong_count(), 2); +#[tokio::test] +async fn unbounded_sender_strong_count_when_dropped() { + let (tx, _rx) = mpsc::unbounded_channel::<()>(); + + let tx2 = tx.clone(); - first_downgrade.notify_one(); + drop(tx2); - second_downgrade.notified().await; + assert_eq!(tx.strong_count(), 1); +} - assert_eq!(tx.weak_count(), 3); - assert_eq!(tx.strong_count(), 2); - assert_eq!(weak.weak_count(), 3); - assert_eq!(weak.strong_count(), 2); - assert_eq!(weak2.weak_count(), 3); - assert_eq!(weak2.strong_count(), 2); +#[tokio::test] +async fn unbounded_sender_weak_count_when_dropped() { + let (tx, _rx) = mpsc::unbounded_channel::<()>(); - drop(weak); - assert_eq!(tx.weak_count(), 2); - assert_eq!(tx.strong_count(), 2); - assert_eq!(weak2.weak_count(), 2); - assert_eq!(weak2.strong_count(), 2); + let weak = tx.downgrade(); - drop(weak2); - assert_eq!(tx.weak_count(), 1); - assert_eq!(tx.strong_count(), 2); + drop(weak); - drop(tx); - }) - }; + assert_eq!(tx.weak_count(), 0); +} - first_downgrade.notified().await; +#[tokio::test] +async fn unbounded_sender_strong_and_weak_conut() { + let (tx, _rx) = mpsc::unbounded_channel::<()>(); - assert_eq!(tx.weak_count(), 2); - assert_eq!(tx.strong_count(), 2); + let tx2 = tx.clone(); let weak = tx.downgrade(); + let weak2 = tx2.downgrade(); - assert_eq!(tx.weak_count(), 3); assert_eq!(tx.strong_count(), 2); - assert_eq!(weak.weak_count(), 3); + assert_eq!(tx2.strong_count(), 2); assert_eq!(weak.strong_count(), 2); + assert_eq!(weak2.strong_count(), 2); - second_downgrade.notify_one(); + assert_eq!(tx.weak_count(), 2); + assert_eq!(tx2.weak_count(), 2); + assert_eq!(weak.weak_count(), 2); + assert_eq!(weak2.weak_count(), 2); - task_handle.await.unwrap(); + drop(tx2); + drop(weak2); - assert_eq!(tx.weak_count(), 1); assert_eq!(tx.strong_count(), 1); + assert_eq!(weak.strong_count(), 1); - drop(weak); - - assert_eq!(tx.weak_count(), 0); - assert_eq!(tx.strong_count(), 1); + assert_eq!(tx.weak_count(), 1); + assert_eq!(weak.weak_count(), 1); }