diff --git a/tokio/src/sync/mpsc/bounded.rs b/tokio/src/sync/mpsc/bounded.rs index ddded8ebb31..29f4ef60121 100644 --- a/tokio/src/sync/mpsc/bounded.rs +++ b/tokio/src/sync/mpsc/bounded.rs @@ -1,7 +1,13 @@ +#[cfg(all(tokio_unstable, feature = "tracing"))] +use tracing::Span; + use crate::sync::batch_semaphore::{self as semaphore, TryAcquireError}; use crate::sync::mpsc::chan; use crate::sync::mpsc::error::{SendError, TryRecvError, TrySendError}; +#[cfg(all(tokio_unstable, feature = "tracing"))] +use crate::util::trace; + cfg_time! { use crate::sync::mpsc::error::SendTimeoutError; use crate::time::Duration; @@ -20,6 +26,8 @@ use std::task::{Context, Poll}; /// [`PollSender`]: https://docs.rs/tokio-util/0.6/tokio_util/sync/struct.PollSender.html pub struct Sender { chan: chan::Tx, + #[cfg(all(tokio_unstable, feature = "tracing"))] + resource_span: Span, } /// Permits to send one value into the channel. @@ -47,6 +55,8 @@ pub struct Permit<'a, T> { /// [`Sender::try_reserve_owned()`]: Sender::try_reserve_owned pub struct OwnedPermit { chan: Option>, + #[cfg(all(tokio_unstable, feature = "tracing"))] + resource_span: Span, } /// Receives values from the associated `Sender`. @@ -59,6 +69,8 @@ pub struct OwnedPermit { pub struct Receiver { /// The channel receiver. chan: chan::Rx, + #[cfg(all(tokio_unstable, feature = "tracing"))] + resource_span: Span, } /// Creates a bounded mpsc channel for communicating between asynchronous tasks @@ -105,13 +117,50 @@ pub struct Receiver { /// } /// } /// ``` +#[track_caller] pub fn channel(buffer: usize) -> (Sender, Receiver) { assert!(buffer > 0, "mpsc bounded channel requires buffer > 0"); let semaphore = (semaphore::Semaphore::new(buffer), buffer); - let (tx, rx) = chan::channel(semaphore); - let tx = Sender::new(tx); - let rx = Receiver::new(rx); + #[cfg(all(tokio_unstable, feature = "tracing"))] + let resource_span = { + let location = std::panic::Location::caller(); + let resource_span = tracing::trace_span!( + "runtime.resource", + concrete_type = "Sender|Receiver", + kind = "Sync", + loc.file = location.file(), + loc.line = location.line(), + loc.col = location.column(), + ); + + resource_span.in_scope(|| { + tracing::trace!( + target: "runtime::resource::state_update", + capacity = buffer, + capacity.op = "override", + ) + }); + + resource_span + }; + + let (tx, rx) = chan::channel( + semaphore, + #[cfg(all(tokio_unstable, feature = "tracing"))] + resource_span.clone(), + ); + + let tx = Sender::new( + tx, + #[cfg(all(tokio_unstable, feature = "tracing"))] + resource_span.clone(), + ); + let rx = Receiver::new( + rx, + #[cfg(all(tokio_unstable, feature = "tracing"))] + resource_span, + ); (tx, rx) } @@ -121,8 +170,15 @@ pub fn channel(buffer: usize) -> (Sender, Receiver) { type Semaphore = (semaphore::Semaphore, usize); impl Receiver { - pub(crate) fn new(chan: chan::Rx) -> Receiver { - Receiver { chan } + pub(crate) fn new( + chan: chan::Rx, + #[cfg(all(tokio_unstable, feature = "tracing"))] resource_span: Span, + ) -> Receiver { + Receiver { + chan, + #[cfg(all(tokio_unstable, feature = "tracing"))] + resource_span, + } } /// Receives the next value for this receiver. @@ -184,7 +240,22 @@ impl Receiver { /// ``` pub async fn recv(&mut self) -> Option { use crate::future::poll_fn; - poll_fn(|cx| self.chan.recv(cx)).await + + #[cfg(all(tokio_unstable, feature = "tracing"))] + let fut = { + let resource_span = self.resource_span.clone(); + trace::async_op( + || poll_fn(|cx| self.chan.recv(cx)), + resource_span, + "Receiver::recv", + "poll_recv", + true, + ) + }; + #[cfg(not(all(tokio_unstable, feature = "tracing")))] + let fut = poll_fn(|cx| self.chan.recv(cx)); + + fut.await } /// Tries to receive the next value for this receiver. @@ -367,8 +438,15 @@ impl fmt::Debug for Receiver { impl Unpin for Receiver {} impl Sender { - pub(crate) fn new(chan: chan::Tx) -> Sender { - Sender { chan } + pub(crate) fn new( + chan: chan::Tx, + #[cfg(all(tokio_unstable, feature = "tracing"))] resource_span: Span, + ) -> Sender { + Sender { + chan, + #[cfg(all(tokio_unstable, feature = "tracing"))] + resource_span, + } } /// Sends a value, waiting until there is capacity. @@ -427,7 +505,21 @@ impl Sender { /// } /// ``` pub async fn send(&self, value: T) -> Result<(), SendError> { - match self.reserve().await { + #[cfg(all(tokio_unstable, feature = "tracing"))] + let fut = { + let resource_span = self.resource_span.clone(); + trace::async_op( + || self.reserve(), + resource_span, + "Sender::send", + "poll_send", + true, + ) + }; + #[cfg(not(all(tokio_unstable, feature = "tracing")))] + let fut = self.reserve(); + + match fut.await { Ok(permit) => { permit.send(value); Ok(()) @@ -473,7 +565,21 @@ impl Sender { /// } /// ``` pub async fn closed(&self) { - self.chan.closed().await + #[cfg(all(tokio_unstable, feature = "tracing"))] + let fut = { + let resource_span = self.resource_span.clone(); + trace::async_op( + || self.chan.closed(), + resource_span, + "Sender::closed", + "poll_closed", + true, + ) + }; + #[cfg(not(all(tokio_unstable, feature = "tracing")))] + let fut = self.chan.closed(); + + fut.await } /// Attempts to immediately send a message on this `Sender` @@ -603,7 +709,21 @@ impl Sender { value: T, timeout: Duration, ) -> Result<(), SendTimeoutError> { - let permit = match crate::time::timeout(timeout, self.reserve()).await { + #[cfg(all(tokio_unstable, feature = "tracing"))] + let fut = { + let resource_span = self.resource_span.clone(); + trace::async_op( + || self.reserve(), + resource_span, + "Sender::send_timeout", + "poll_send_timeout", + true, + ) + }; + #[cfg(not(all(tokio_unstable, feature = "tracing")))] + let fut = self.reserve(); + + let permit = match crate::time::timeout(timeout, fut).await { Err(_) => { return Err(SendTimeoutError::Timeout(value)); } @@ -722,7 +842,21 @@ impl Sender { /// } /// ``` pub async fn reserve(&self) -> Result, SendError<()>> { - self.reserve_inner().await?; + #[cfg(all(tokio_unstable, feature = "tracing"))] + let fut = { + let resource_span = self.resource_span.clone(); + trace::async_op( + || self.reserve_inner(), + resource_span, + "Sender::reserve", + "poll_reserve", + true, + ) + }; + #[cfg(not(all(tokio_unstable, feature = "tracing")))] + let fut = self.reserve_inner(); + + fut.await?; Ok(Permit { chan: &self.chan }) } @@ -807,9 +941,25 @@ impl Sender { /// [`send`]: OwnedPermit::send /// [`Arc::clone`]: std::sync::Arc::clone pub async fn reserve_owned(self) -> Result, SendError<()>> { - self.reserve_inner().await?; + #[cfg(all(tokio_unstable, feature = "tracing"))] + let fut = { + let resource_span = self.resource_span.clone(); + trace::async_op( + || self.reserve_inner(), + resource_span, + "Sender::reserve_owned", + "poll_reserve_owned", + true, + ) + }; + #[cfg(not(all(tokio_unstable, feature = "tracing")))] + let fut = self.reserve_inner(); + + fut.await?; Ok(OwnedPermit { chan: Some(self.chan), + #[cfg(all(tokio_unstable, feature = "tracing"))] + resource_span: self.resource_span, }) } @@ -937,6 +1087,8 @@ impl Sender { Ok(OwnedPermit { chan: Some(self.chan), + #[cfg(all(tokio_unstable, feature = "tracing"))] + resource_span: self.resource_span, }) } @@ -994,6 +1146,8 @@ impl Clone for Sender { fn clone(&self) -> Self { Sender { chan: self.chan.clone(), + #[cfg(all(tokio_unstable, feature = "tracing"))] + resource_span: self.resource_span.clone(), } } } @@ -1119,7 +1273,11 @@ impl OwnedPermit { }); chan.send(value); - Sender { chan } + Sender { + chan, + #[cfg(all(tokio_unstable, feature = "tracing"))] + resource_span: self.resource_span.clone(), + } } /// Releases the reserved capacity *without* sending a message, returning the @@ -1161,7 +1319,11 @@ impl OwnedPermit { // Add the permit back to the semaphore chan.semaphore().add_permit(); - Sender { chan } + Sender { + chan, + #[cfg(all(tokio_unstable, feature = "tracing"))] + resource_span: self.resource_span.clone(), + } } } diff --git a/tokio/src/sync/mpsc/chan.rs b/tokio/src/sync/mpsc/chan.rs index c3007de89c7..5d4037f5ebf 100644 --- a/tokio/src/sync/mpsc/chan.rs +++ b/tokio/src/sync/mpsc/chan.rs @@ -17,6 +17,9 @@ use std::task::{Context, Poll}; /// Channel sender. pub(crate) struct Tx { inner: Arc>, + + #[cfg(all(tokio_unstable, feature = "tracing"))] + resource_span: tracing::Span, } impl fmt::Debug for Tx { @@ -28,6 +31,9 @@ impl fmt::Debug for Tx { /// Channel receiver. pub(crate) struct Rx { inner: Arc>, + + #[cfg(all(tokio_unstable, feature = "tracing"))] + resource_span: tracing::Span, } impl fmt::Debug for Rx { @@ -104,7 +110,10 @@ impl fmt::Debug for RxFields { unsafe impl Send for Chan {} unsafe impl Sync for Chan {} -pub(crate) fn channel(semaphore: S) -> (Tx, Rx) { +pub(crate) fn channel( + semaphore: S, + #[cfg(all(tokio_unstable, feature = "tracing"))] resource_span: tracing::Span, +) -> (Tx, Rx) { let (tx, rx) = list::channel(); let chan = Arc::new(Chan { @@ -119,14 +128,77 @@ pub(crate) fn channel(semaphore: S) -> (Tx, Rx) { }), }); - (Tx::new(chan.clone()), Rx::new(chan)) + #[cfg(all(tokio_unstable, feature = "tracing"))] + { + resource_span.in_scope(|| { + tracing::trace!( + target: "runtime::resource::state_update", + rx = 0, + rx.op = "override", + ) + }); + resource_span.in_scope(|| { + tracing::trace!( + target: "runtime::resource::state_update", + tx = 0, + tx.op = "override", + ) + }); + resource_span.in_scope(|| { + tracing::trace!( + target: "runtime::resource::state_update", + values = 0, + values.op = "override", + ) + }); + resource_span.in_scope(|| { + tracing::trace!( + target: "runtime::resource::state_update", + rx_dropped = false, + rx_dropped.op = "override", + ) + }); + } + + let tx = Tx::new( + chan.clone(), + #[cfg(all(tokio_unstable, feature = "tracing"))] + resource_span.clone(), + ); + + let rx = Rx::new( + chan, + #[cfg(all(tokio_unstable, feature = "tracing"))] + resource_span, + ); + + (tx, rx) } // ===== impl Tx ===== impl Tx { - fn new(chan: Arc>) -> Tx { - Tx { inner: chan } + fn new( + chan: Arc>, + #[cfg(all(tokio_unstable, feature = "tracing"))] resource_span: tracing::Span, + ) -> Tx { + #[cfg(all(tokio_unstable, feature = "tracing"))] + { + let tx_handles = chan.tx_count.load(Relaxed) + 1; + resource_span.in_scope(|| { + tracing::trace!( + target: "runtime::resource::state_update", + tx_handles, + tx_handles.op = "override", + ) + }); + } + + Tx { + inner: chan, + #[cfg(all(tokio_unstable, feature = "tracing"))] + resource_span, + } } pub(super) fn semaphore(&self) -> &S { @@ -135,6 +207,20 @@ impl Tx { /// Send a message and notify the receiver. pub(crate) fn send(&self, value: T) { + #[cfg(all(tokio_unstable, feature = "tracing"))] + self.resource_span.in_scope(|| { + tracing::trace!( + target: "runtime::resource::state_update", + tx = 1, + tx.op = "add", + ); + tracing::trace!( + target: "runtime::resource::state_update", + values = 1, + values.op = "add", + ) + }); + self.inner.send(value); } @@ -171,17 +257,38 @@ impl Clone for Tx { fn clone(&self) -> Tx { // Using a Relaxed ordering here is sufficient as the caller holds a // strong ref to `self`, preventing a concurrent decrement to zero. - self.inner.tx_count.fetch_add(1, Relaxed); + #[allow(unused_variables)] + let tx_count = self.inner.tx_count.fetch_add(1, Relaxed); + + #[cfg(all(tokio_unstable, feature = "tracing"))] + self.resource_span.in_scope(|| { + tracing::trace!( + target: "runtime::resource::state_update", + tx_handles = tx_count + 1, + tx_handles.op = "override", + ) + }); Tx { inner: self.inner.clone(), + #[cfg(all(tokio_unstable, feature = "tracing"))] + resource_span: self.resource_span.clone(), } } } impl Drop for Tx { fn drop(&mut self) { - if self.inner.tx_count.fetch_sub(1, AcqRel) != 1 { + let tx_count = self.inner.tx_count.fetch_sub(1, AcqRel); + if tx_count != 1 { + #[cfg(all(tokio_unstable, feature = "tracing"))] + self.resource_span.in_scope(|| { + tracing::trace!( + target: "runtime::resource::state_update", + tx_handles = tx_count - 1, + tx_handles.op = "override", + ) + }); return; } @@ -196,8 +303,15 @@ impl Drop for Tx { // ===== impl Rx ===== impl Rx { - fn new(chan: Arc>) -> Rx { - Rx { inner: chan } + fn new( + chan: Arc>, + #[cfg(all(tokio_unstable, feature = "tracing"))] resource_span: tracing::Span, + ) -> Rx { + Rx { + inner: chan, + #[cfg(all(tokio_unstable, feature = "tracing"))] + resource_span, + } } pub(crate) fn close(&mut self) { @@ -231,6 +345,21 @@ impl Rx { Some(Value(value)) => { self.inner.semaphore.add_permit(); coop.made_progress(); + + #[cfg(all(tokio_unstable, feature = "tracing"))] + self.resource_span.in_scope(|| { + tracing::trace!( + target: "runtime::resource::state_update", + rx = 1, + rx.op = "add", + ); + tracing::trace!( + target: "runtime::resource::state_update", + values = 1, + values.op = "sub", + ) + }); + return Ready(Some(value)); } Some(Closed) => { @@ -279,6 +408,21 @@ impl Rx { match rx_fields.list.try_pop(&self.inner.tx) { TryPopResult::Ok(value) => { self.inner.semaphore.add_permit(); + + #[cfg(all(tokio_unstable, feature = "tracing"))] + self.resource_span.in_scope(|| { + tracing::trace!( + target: "runtime::resource::state_update", + rx = 1, + rx.op = "add", + ); + tracing::trace!( + target: "runtime::resource::state_update", + values = 1, + values.op = "sub", + ) + }); + return Ok(value); } TryPopResult::Closed => return Err(TryRecvError::Disconnected), @@ -317,6 +461,15 @@ impl Drop for Rx { fn drop(&mut self) { use super::block::Read::Value; + #[cfg(all(tokio_unstable, feature = "tracing"))] + self.resource_span.in_scope(|| { + tracing::trace!( + target: "runtime::resource::state_update", + rx_dropped = true, + rx_dropped.op = "override", + ) + }); + self.close(); self.inner.rx_fields.with_mut(|rx_fields_ptr| { diff --git a/tokio/src/sync/mpsc/unbounded.rs b/tokio/src/sync/mpsc/unbounded.rs index f8338fb0885..6d030440339 100644 --- a/tokio/src/sync/mpsc/unbounded.rs +++ b/tokio/src/sync/mpsc/unbounded.rs @@ -1,7 +1,13 @@ +#[cfg(all(tokio_unstable, feature = "tracing"))] +use tracing::Span; + use crate::loom::sync::atomic::AtomicUsize; use crate::sync::mpsc::chan; use crate::sync::mpsc::error::{SendError, TryRecvError}; +#[cfg(all(tokio_unstable, feature = "tracing"))] +use crate::util::trace; + use std::fmt; use std::task::{Context, Poll}; @@ -11,12 +17,16 @@ use std::task::{Context, Poll}; /// [`unbounded_channel`](unbounded_channel) function. pub struct UnboundedSender { chan: chan::Tx, + #[cfg(all(tokio_unstable, feature = "tracing"))] + resource_span: Span, } impl Clone for UnboundedSender { fn clone(&self) -> Self { UnboundedSender { chan: self.chan.clone(), + #[cfg(all(tokio_unstable, feature = "tracing"))] + resource_span: self.resource_span.clone(), } } } @@ -40,6 +50,8 @@ impl fmt::Debug for UnboundedSender { pub struct UnboundedReceiver { /// The channel receiver chan: chan::Rx, + #[cfg(all(tokio_unstable, feature = "tracing"))] + resource_span: Span, } impl fmt::Debug for UnboundedReceiver { @@ -60,11 +72,37 @@ impl fmt::Debug for UnboundedReceiver { /// **Note** that the amount of available system memory is an implicit bound to /// the channel. Using an `unbounded` channel has the ability of causing the /// process to run out of memory. In this case, the process will be aborted. +#[track_caller] pub fn unbounded_channel() -> (UnboundedSender, UnboundedReceiver) { - let (tx, rx) = chan::channel(AtomicUsize::new(0)); + #[cfg(all(tokio_unstable, feature = "tracing"))] + let resource_span = { + let location = std::panic::Location::caller(); + tracing::trace_span!( + "runtime.resource", + concrete_type = "Sender|Receiver", + kind = "Sync", + loc.file = location.file(), + loc.line = location.line(), + loc.col = location.column(), + ) + }; - let tx = UnboundedSender::new(tx); - let rx = UnboundedReceiver::new(rx); + let (tx, rx) = chan::channel( + AtomicUsize::new(0), + #[cfg(all(tokio_unstable, feature = "tracing"))] + resource_span.clone(), + ); + + let tx = UnboundedSender::new( + tx, + #[cfg(all(tokio_unstable, feature = "tracing"))] + resource_span.clone(), + ); + let rx = UnboundedReceiver::new( + rx, + #[cfg(all(tokio_unstable, feature = "tracing"))] + resource_span, + ); (tx, rx) } @@ -73,8 +111,15 @@ pub fn unbounded_channel() -> (UnboundedSender, UnboundedReceiver) { type Semaphore = AtomicUsize; impl UnboundedReceiver { - pub(crate) fn new(chan: chan::Rx) -> UnboundedReceiver { - UnboundedReceiver { chan } + pub(crate) fn new( + chan: chan::Rx, + #[cfg(all(tokio_unstable, feature = "tracing"))] resource_span: Span, + ) -> UnboundedReceiver { + UnboundedReceiver { + chan, + #[cfg(all(tokio_unstable, feature = "tracing"))] + resource_span, + } } /// Receives the next value for this receiver. @@ -134,7 +179,21 @@ impl UnboundedReceiver { pub async fn recv(&mut self) -> Option { use crate::future::poll_fn; - poll_fn(|cx| self.poll_recv(cx)).await + #[cfg(all(tokio_unstable, feature = "tracing"))] + let poll = { + let resource_span = self.resource_span.clone(); + trace::async_op( + || poll_fn(|cx| self.poll_recv(cx)), + resource_span, + "Receiver::recv", + "poll_recv", + true, + ) + }; + #[cfg(not(all(tokio_unstable, feature = "tracing")))] + let poll = poll_fn(|cx| self.poll_recv(cx)); + + poll.await } /// Tries to receive the next value for this receiver. @@ -249,8 +308,15 @@ impl UnboundedReceiver { } impl UnboundedSender { - pub(crate) fn new(chan: chan::Tx) -> UnboundedSender { - UnboundedSender { chan } + pub(crate) fn new( + chan: chan::Tx, + #[cfg(all(tokio_unstable, feature = "tracing"))] resource_span: Span, + ) -> UnboundedSender { + UnboundedSender { + chan, + #[cfg(all(tokio_unstable, feature = "tracing"))] + resource_span, + } } /// Attempts to send a message on this `UnboundedSender` without blocking. @@ -341,7 +407,21 @@ impl UnboundedSender { /// } /// ``` pub async fn closed(&self) { - self.chan.closed().await + #[cfg(all(tokio_unstable, feature = "tracing"))] + let poll = { + let resource_span = self.resource_span.clone(); + trace::async_op( + || self.chan.closed(), + resource_span, + "Receiver::recv", + "poll_recv", + true, + ) + }; + #[cfg(not(all(tokio_unstable, feature = "tracing")))] + let poll = self.chan.closed(); + + poll.await } /// Checks if the channel has been closed. This happens when the