Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tracing: instrument mpsc channels #4644

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
192 changes: 177 additions & 15 deletions tokio/src/sync/mpsc/bounded.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
#[cfg(all(tokio_unstable, feature = "tracing"))]
use tracing::Span;

use crate::sync::batch_semaphore::{self as semaphore, TryAcquireError};
use crate::sync::mpsc::chan;
use crate::sync::mpsc::error::{SendError, TryRecvError, TrySendError};

#[cfg(all(tokio_unstable, feature = "tracing"))]
use crate::util::trace;

cfg_time! {
use crate::sync::mpsc::error::SendTimeoutError;
use crate::time::Duration;
Expand All @@ -20,6 +26,8 @@ use std::task::{Context, Poll};
/// [`PollSender`]: https://docs.rs/tokio-util/0.6/tokio_util/sync/struct.PollSender.html
pub struct Sender<T> {
chan: chan::Tx<T, Semaphore>,
#[cfg(all(tokio_unstable, feature = "tracing"))]
resource_span: Span,
}

/// Permits to send one value into the channel.
Expand Down Expand Up @@ -47,6 +55,8 @@ pub struct Permit<'a, T> {
/// [`Sender::try_reserve_owned()`]: Sender::try_reserve_owned
pub struct OwnedPermit<T> {
chan: Option<chan::Tx<T, Semaphore>>,
#[cfg(all(tokio_unstable, feature = "tracing"))]
resource_span: Span,
}

/// Receives values from the associated `Sender`.
Expand All @@ -59,6 +69,8 @@ pub struct OwnedPermit<T> {
pub struct Receiver<T> {
/// The channel receiver.
chan: chan::Rx<T, Semaphore>,
#[cfg(all(tokio_unstable, feature = "tracing"))]
resource_span: Span,
}

/// Creates a bounded mpsc channel for communicating between asynchronous tasks
Expand Down Expand Up @@ -105,13 +117,50 @@ pub struct Receiver<T> {
/// }
/// }
/// ```
#[track_caller]
pub fn channel<T>(buffer: usize) -> (Sender<T>, Receiver<T>) {
assert!(buffer > 0, "mpsc bounded channel requires buffer > 0");
let semaphore = (semaphore::Semaphore::new(buffer), buffer);
let (tx, rx) = chan::channel(semaphore);

let tx = Sender::new(tx);
let rx = Receiver::new(rx);
#[cfg(all(tokio_unstable, feature = "tracing"))]
let resource_span = {
let location = std::panic::Location::caller();
let resource_span = tracing::trace_span!(
"runtime.resource",
concrete_type = "Sender|Receiver",
kind = "Sync",
loc.file = location.file(),
loc.line = location.line(),
loc.col = location.column(),
);

resource_span.in_scope(|| {
tracing::trace!(
target: "runtime::resource::state_update",
capacity = buffer,
capacity.op = "override",
)
});

resource_span
};

let (tx, rx) = chan::channel(
semaphore,
#[cfg(all(tokio_unstable, feature = "tracing"))]
resource_span.clone(),
);

let tx = Sender::new(
tx,
#[cfg(all(tokio_unstable, feature = "tracing"))]
resource_span.clone(),
);
let rx = Receiver::new(
rx,
#[cfg(all(tokio_unstable, feature = "tracing"))]
resource_span,
);

(tx, rx)
}
Expand All @@ -121,8 +170,15 @@ pub fn channel<T>(buffer: usize) -> (Sender<T>, Receiver<T>) {
type Semaphore = (semaphore::Semaphore, usize);

impl<T> Receiver<T> {
pub(crate) fn new(chan: chan::Rx<T, Semaphore>) -> Receiver<T> {
Receiver { chan }
pub(crate) fn new(
chan: chan::Rx<T, Semaphore>,
#[cfg(all(tokio_unstable, feature = "tracing"))] resource_span: Span,
) -> Receiver<T> {
Receiver {
chan,
#[cfg(all(tokio_unstable, feature = "tracing"))]
resource_span,
}
}

/// Receives the next value for this receiver.
Expand Down Expand Up @@ -184,7 +240,22 @@ impl<T> Receiver<T> {
/// ```
pub async fn recv(&mut self) -> Option<T> {
use crate::future::poll_fn;
poll_fn(|cx| self.chan.recv(cx)).await

#[cfg(all(tokio_unstable, feature = "tracing"))]
let fut = {
let resource_span = self.resource_span.clone();
trace::async_op(
|| poll_fn(|cx| self.chan.recv(cx)),
resource_span,
"Receiver::recv",
"poll_recv",
true,
)
};
#[cfg(not(all(tokio_unstable, feature = "tracing")))]
let fut = poll_fn(|cx| self.chan.recv(cx));

fut.await
}

/// Tries to receive the next value for this receiver.
Expand Down Expand Up @@ -367,8 +438,15 @@ impl<T> fmt::Debug for Receiver<T> {
impl<T> Unpin for Receiver<T> {}

impl<T> Sender<T> {
pub(crate) fn new(chan: chan::Tx<T, Semaphore>) -> Sender<T> {
Sender { chan }
pub(crate) fn new(
chan: chan::Tx<T, Semaphore>,
#[cfg(all(tokio_unstable, feature = "tracing"))] resource_span: Span,
) -> Sender<T> {
Sender {
chan,
#[cfg(all(tokio_unstable, feature = "tracing"))]
resource_span,
}
}

/// Sends a value, waiting until there is capacity.
Expand Down Expand Up @@ -427,7 +505,21 @@ impl<T> Sender<T> {
/// }
/// ```
pub async fn send(&self, value: T) -> Result<(), SendError<T>> {
match self.reserve().await {
#[cfg(all(tokio_unstable, feature = "tracing"))]
let fut = {
let resource_span = self.resource_span.clone();
trace::async_op(
|| self.reserve(),
resource_span,
"Sender::send",
"poll_send",
true,
)
};
#[cfg(not(all(tokio_unstable, feature = "tracing")))]
let fut = self.reserve();

match fut.await {
Ok(permit) => {
permit.send(value);
Ok(())
Expand Down Expand Up @@ -473,7 +565,21 @@ impl<T> Sender<T> {
/// }
/// ```
pub async fn closed(&self) {
self.chan.closed().await
#[cfg(all(tokio_unstable, feature = "tracing"))]
let fut = {
let resource_span = self.resource_span.clone();
trace::async_op(
|| self.chan.closed(),
resource_span,
"Sender::closed",
"poll_closed",
true,
)
};
#[cfg(not(all(tokio_unstable, feature = "tracing")))]
let fut = self.chan.closed();

fut.await
}

/// Attempts to immediately send a message on this `Sender`
Expand Down Expand Up @@ -603,7 +709,21 @@ impl<T> Sender<T> {
value: T,
timeout: Duration,
) -> Result<(), SendTimeoutError<T>> {
let permit = match crate::time::timeout(timeout, self.reserve()).await {
#[cfg(all(tokio_unstable, feature = "tracing"))]
let fut = {
let resource_span = self.resource_span.clone();
trace::async_op(
|| self.reserve(),
resource_span,
"Sender::send_timeout",
"poll_send_timeout",
true,
)
};
#[cfg(not(all(tokio_unstable, feature = "tracing")))]
let fut = self.reserve();

let permit = match crate::time::timeout(timeout, fut).await {
Err(_) => {
return Err(SendTimeoutError::Timeout(value));
}
Expand Down Expand Up @@ -722,7 +842,21 @@ impl<T> Sender<T> {
/// }
/// ```
pub async fn reserve(&self) -> Result<Permit<'_, T>, SendError<()>> {
self.reserve_inner().await?;
#[cfg(all(tokio_unstable, feature = "tracing"))]
let fut = {
let resource_span = self.resource_span.clone();
trace::async_op(
|| self.reserve_inner(),
resource_span,
"Sender::reserve",
"poll_reserve",
true,
)
};
#[cfg(not(all(tokio_unstable, feature = "tracing")))]
let fut = self.reserve_inner();

fut.await?;
Ok(Permit { chan: &self.chan })
}

Expand Down Expand Up @@ -807,9 +941,25 @@ impl<T> Sender<T> {
/// [`send`]: OwnedPermit::send
/// [`Arc::clone`]: std::sync::Arc::clone
pub async fn reserve_owned(self) -> Result<OwnedPermit<T>, SendError<()>> {
self.reserve_inner().await?;
#[cfg(all(tokio_unstable, feature = "tracing"))]
let fut = {
let resource_span = self.resource_span.clone();
trace::async_op(
|| self.reserve_inner(),
resource_span,
"Sender::reserve_owned",
"poll_reserve_owned",
true,
)
};
#[cfg(not(all(tokio_unstable, feature = "tracing")))]
let fut = self.reserve_inner();

fut.await?;
Ok(OwnedPermit {
chan: Some(self.chan),
#[cfg(all(tokio_unstable, feature = "tracing"))]
resource_span: self.resource_span,
})
}

Expand Down Expand Up @@ -937,6 +1087,8 @@ impl<T> Sender<T> {

Ok(OwnedPermit {
chan: Some(self.chan),
#[cfg(all(tokio_unstable, feature = "tracing"))]
resource_span: self.resource_span,
})
}

Expand Down Expand Up @@ -994,6 +1146,8 @@ impl<T> Clone for Sender<T> {
fn clone(&self) -> Self {
Sender {
chan: self.chan.clone(),
#[cfg(all(tokio_unstable, feature = "tracing"))]
resource_span: self.resource_span.clone(),
}
}
}
Expand Down Expand Up @@ -1119,7 +1273,11 @@ impl<T> OwnedPermit<T> {
});
chan.send(value);

Sender { chan }
Sender {
chan,
#[cfg(all(tokio_unstable, feature = "tracing"))]
resource_span: self.resource_span.clone(),
}
}

/// Releases the reserved capacity *without* sending a message, returning the
Expand Down Expand Up @@ -1161,7 +1319,11 @@ impl<T> OwnedPermit<T> {

// Add the permit back to the semaphore
chan.semaphore().add_permit();
Sender { chan }
Sender {
chan,
#[cfg(all(tokio_unstable, feature = "tracing"))]
resource_span: self.resource_span.clone(),
}
}
}

Expand Down
Loading