Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reproduction of excessive tokio polling #284

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ repository = "https://github.com/webrtc-rs/examples"

[dev-dependencies]
webrtc = { path = "../webrtc" }

tokio = { version = "1.15", features = ["full"] }
console-subscriber = { version = "<=0.1.7" }
tokio = { version = "1.15", features = ["full", "tracing"] }
env_logger = "0.9"
clap = "3.0"
hyper = { version = "0.14", features = ["full"] }
Expand Down
4 changes: 2 additions & 2 deletions examples/examples/reflect/reflect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,10 @@ async fn main() -> Result<()> {
record.args()
)
})
.filter(None, log::LevelFilter::Trace)
.filter(None, log::LevelFilter::Info)
.init();
}

console_subscriber::init();
// Everything below is the WebRTC-rs API! Thanks for using it ❤️.

// Create a MediaEngine object to configure the supported codec
Expand Down
1 change: 1 addition & 0 deletions webrtc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ ring = "0.16.20"
sha2 = "0.10.2"
lazy_static = "1.4"
hex = "0.4.3"
pin-project-lite = "0.2.9"

# [minimal-versions]
# fixes "the trait bound `time::Month: From<u8>` is not satisfied"
Expand Down
63 changes: 52 additions & 11 deletions webrtc/src/peer_connection/operation/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
#[cfg(test)]
mod operation_test;

use pin_project_lite::pin_project;
use std::borrow::Cow;
use std::fmt;
use std::future::Future;
use std::pin::Pin;
Expand All @@ -11,6 +13,41 @@ use waitgroup::WaitGroup;

use crate::error::Result;

pin_project! {
struct PrintingFuture<Fut> {
name: Cow<'static, str>,
#[pin]
fut: Fut,
}
}

impl<Fut> PrintingFuture<Fut> {
fn new(fut: Fut, name: Cow<'static, str>) -> Self {
Self { fut, name }
}
}

impl<Fut> Future for PrintingFuture<Fut>
where
Fut: Future,
{
type Output = Fut::Output;

fn poll(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Self::Output> {
log::info!("Polled future {}", self.name);
let this = self.project();

this.fut.poll(cx)
}
}

fn trace_future<Fut>(fut: Fut, name: impl Into<Cow<'static, str>>) -> PrintingFuture<Fut> {
PrintingFuture::new(fut, name.into())
}

/// Operation is a function
pub struct Operation(
pub Box<dyn (FnMut() -> Pin<Box<dyn Future<Output = bool> + Send + 'static>>) + Send + Sync>,
Expand Down Expand Up @@ -51,9 +88,10 @@ impl Operations {
let l = Arc::clone(&length);
let ops_tx = Arc::new(ops_tx);
let ops_tx2 = Arc::clone(&ops_tx);
tokio::spawn(async move {
Operations::start(l, ops_tx, ops_rx, close_rx).await;
});
tokio::spawn(trace_future(
Operations::start(l, ops_tx, ops_rx, close_rx),
"Operations::start",
));

Operations {
length,
Expand Down Expand Up @@ -113,19 +151,22 @@ impl Operations {
) {
loop {
tokio::select! {
_ = close_rx.recv() => {
_ = trace_future(close_rx.recv(), "close_rx.recv()") => {
break;
}
result = ops_rx.recv() => {
if let Some(mut f) = result {
length.fetch_sub(1, Ordering::SeqCst);
if f.0().await {
// Requeue this operation
let _ = Operations::enqueue_inner(f, &ops_tx, &length);
}
Some(mut f) = trace_future(ops_rx.recv(), "ops_rx.recv()") => {
length.fetch_sub(1, Ordering::SeqCst);
let op = format!("{:?}", f);
log::info!("Started operation {}", op);
if trace_future(f.0(), op.clone()).await {
// Requeue this operation
log::info!("Re-queued operation {}", op);
let _ = Operations::enqueue_inner(f, &ops_tx, &length);
}
log::info!("Done with operation {}", op);
}
}
log::info!("Operation loop spun");
}
}

Expand Down