From 5d05b88de6d3a3eff3cd4e9b57219a25e01d3edc Mon Sep 17 00:00:00 2001 From: Hugo Tunius Date: Tue, 6 Sep 2022 16:44:41 +0100 Subject: [PATCH 1/2] Reproduction of excessive tokio polling With this change the reflect example will demonstrate excessive polling in the operations task despite not being woken. --- examples/Cargo.toml | 4 ++-- examples/examples/reflect/reflect.rs | 2 +- webrtc/src/peer_connection/operation/mod.rs | 17 ++++++++++------- 3 files changed, 13 insertions(+), 10 deletions(-) diff --git a/examples/Cargo.toml b/examples/Cargo.toml index a12ace73f..b27d7e647 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -16,8 +16,8 @@ repository = "https://github.com/webrtc-rs/examples" [dev-dependencies] webrtc = { path = "../webrtc" } - -tokio = { version = "1.15", features = ["full"] } +console-subscriber = { version = "<=0.1.7" } +tokio = { version = "1.15", features = ["full", "tracing"] } env_logger = "0.9" clap = "3.0" hyper = { version = "0.14", features = ["full"] } diff --git a/examples/examples/reflect/reflect.rs b/examples/examples/reflect/reflect.rs index fcb51c49c..bb62db09d 100644 --- a/examples/examples/reflect/reflect.rs +++ b/examples/examples/reflect/reflect.rs @@ -81,7 +81,7 @@ async fn main() -> Result<()> { .filter(None, log::LevelFilter::Trace) .init(); } - + console_subscriber::init(); // Everything below is the WebRTC-rs API! Thanks for using it ❤️. // Create a MediaEngine object to configure the supported codec diff --git a/webrtc/src/peer_connection/operation/mod.rs b/webrtc/src/peer_connection/operation/mod.rs index 5062f1b79..a8346a2f1 100644 --- a/webrtc/src/peer_connection/operation/mod.rs +++ b/webrtc/src/peer_connection/operation/mod.rs @@ -116,16 +116,19 @@ impl Operations { _ = close_rx.recv() => { break; } - result = ops_rx.recv() => { - if let Some(mut f) = result { - length.fetch_sub(1, Ordering::SeqCst); - if f.0().await { - // Requeue this operation - let _ = Operations::enqueue_inner(f, &ops_tx, &length); - } + Some(mut f) = ops_rx.recv() => { + length.fetch_sub(1, Ordering::SeqCst); + let op = format!("{:?}", f); + log::info!("Started operation {}", op); + if f.0().await { + // Requeue this operation + log::info!("Re-queued operation {}", op); + let _ = Operations::enqueue_inner(f, &ops_tx, &length); } + log::info!("Done with operation {}", op); } } + log::info!("Operation loop spun"); } } From 4c5bbc340ce914aaf56b7f40d72f717c506e4c73 Mon Sep 17 00:00:00 2001 From: Hugo Tunius Date: Tue, 6 Sep 2022 17:29:01 +0100 Subject: [PATCH 2/2] Trace --- examples/examples/reflect/reflect.rs | 2 +- webrtc/Cargo.toml | 1 + webrtc/src/peer_connection/operation/mod.rs | 50 ++++++++++++++++++--- 3 files changed, 46 insertions(+), 7 deletions(-) diff --git a/examples/examples/reflect/reflect.rs b/examples/examples/reflect/reflect.rs index bb62db09d..d9f20dfdc 100644 --- a/examples/examples/reflect/reflect.rs +++ b/examples/examples/reflect/reflect.rs @@ -78,7 +78,7 @@ async fn main() -> Result<()> { record.args() ) }) - .filter(None, log::LevelFilter::Trace) + .filter(None, log::LevelFilter::Info) .init(); } console_subscriber::init(); diff --git a/webrtc/Cargo.toml b/webrtc/Cargo.toml index 92152684f..ef2dc5367 100644 --- a/webrtc/Cargo.toml +++ b/webrtc/Cargo.toml @@ -44,6 +44,7 @@ ring = "0.16.20" sha2 = "0.10.2" lazy_static = "1.4" hex = "0.4.3" +pin-project-lite = "0.2.9" # [minimal-versions] # fixes "the trait bound `time::Month: From` is not satisfied" diff --git a/webrtc/src/peer_connection/operation/mod.rs b/webrtc/src/peer_connection/operation/mod.rs index a8346a2f1..9006956c5 100644 --- a/webrtc/src/peer_connection/operation/mod.rs +++ b/webrtc/src/peer_connection/operation/mod.rs @@ -1,6 +1,8 @@ #[cfg(test)] mod operation_test; +use pin_project_lite::pin_project; +use std::borrow::Cow; use std::fmt; use std::future::Future; use std::pin::Pin; @@ -11,6 +13,41 @@ use waitgroup::WaitGroup; use crate::error::Result; +pin_project! { + struct PrintingFuture { + name: Cow<'static, str>, + #[pin] + fut: Fut, + } +} + +impl PrintingFuture { + fn new(fut: Fut, name: Cow<'static, str>) -> Self { + Self { fut, name } + } +} + +impl Future for PrintingFuture +where + Fut: Future, +{ + type Output = Fut::Output; + + fn poll( + self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll { + log::info!("Polled future {}", self.name); + let this = self.project(); + + this.fut.poll(cx) + } +} + +fn trace_future(fut: Fut, name: impl Into>) -> PrintingFuture { + PrintingFuture::new(fut, name.into()) +} + /// Operation is a function pub struct Operation( pub Box Pin + Send + 'static>>) + Send + Sync>, @@ -51,9 +88,10 @@ impl Operations { let l = Arc::clone(&length); let ops_tx = Arc::new(ops_tx); let ops_tx2 = Arc::clone(&ops_tx); - tokio::spawn(async move { - Operations::start(l, ops_tx, ops_rx, close_rx).await; - }); + tokio::spawn(trace_future( + Operations::start(l, ops_tx, ops_rx, close_rx), + "Operations::start", + )); Operations { length, @@ -113,14 +151,14 @@ impl Operations { ) { loop { tokio::select! { - _ = close_rx.recv() => { + _ = trace_future(close_rx.recv(), "close_rx.recv()") => { break; } - Some(mut f) = ops_rx.recv() => { + Some(mut f) = trace_future(ops_rx.recv(), "ops_rx.recv()") => { length.fetch_sub(1, Ordering::SeqCst); let op = format!("{:?}", f); log::info!("Started operation {}", op); - if f.0().await { + if trace_future(f.0(), op.clone()).await { // Requeue this operation log::info!("Re-queued operation {}", op); let _ = Operations::enqueue_inner(f, &ops_tx, &length);