Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

futures::future::join_all on JoinHandle is too slow #2401

Closed
bkolobara opened this issue Apr 13, 2020 · 2 comments
Closed

futures::future::join_all on JoinHandle is too slow #2401

bkolobara opened this issue Apr 13, 2020 · 2 comments
Labels
A-tokio Area: The main tokio crate C-question User questions that are neither feature requests nor bug reports M-task Module: tokio/task

Comments

@bkolobara
Copy link

Tokio version: 0.2.16

Hi everyone,

I'm trying to hunt down some performance issues in my app. I have built a completely un-scientific benchmark to demonstrate my issue here:

use futures;
use tokio;

use std::time::Instant;

#[tokio::main]
async fn main() {
    let spawn_time = Instant::now();
    let mut sub_tasks = vec![];
    let (tx, mut rx) = tokio::sync::mpsc::channel(1000);
    // Spawn & send
    for _ in 0..1_000_000 {
        let mut sender = tx.clone();
        let handle = tokio::spawn(async move {
            sender.send(0).await;
        });
        sub_tasks.push(handle);
    }
    // Receive
    for _ in 0..1_000_000 {
        let _ = rx.recv().await;
    }
    println!(
        "Task spawned and received message: {}ms",
        spawn_time.elapsed().as_millis()
    );

    let join_all_time = Instant::now();

    futures::future::join_all(sub_tasks).await;
    println!(
        "Joining all handles: {}ms",
        join_all_time.elapsed().as_millis()
    );
}

And I get the following results:

Task spawned and received message: 1758ms
Joining all handles: 57047ms

I just can't figure out why it's 33x faster to:

  1. Spawn a task
  2. Send a message from this task
  3. And wait to receive all messages from the tasks

Instead of:

  1. Checking if all tasks finished.

Am I missing something obvious here? Is there a better way to await all sub-tasks to finish? From my perspective it looks like I would be better of manually signalling when the tasks finishes at the end of the closure by sending a TaskEnded message back to the parent instead of using futures::future::join_all().

@Darksonn
Copy link
Contributor

This is join_all's fault. Try this out:

for handle in sub_tasks {
    handle.await.unwrap();
}

On my laptop this gives:

Task spawned and received message: 950ms
Joining all handles: 70ms

This is because every single time one of the futures given to join_all becomes ready, it will poll all of the provided futures, which is very time consuming when there is one million of them. To make matters worse, they wont be removed from the list when they finish, so even if most of them have finished, a single poll on join_all will still require looping through one million items. Finally, because of Tokio's automatic cooperative task yielding feature, this means that you can never complete more than 128 join handles in a single poll. This means that you need at least 7813 polls of join_all, resulting in at least 7813000000 iterations inside join_all's poll function in total.

If you need more control than the for loop I suggested above, the FuturesUnordered collection type can be used instead. This collection provides support for waiting until exactly one JoinHandle finishes, without the JoinHandles having to finish in the order they appear in the vector.

Example using FuturesUnordered
use std::time::Instant;
use tokio::stream::StreamExt; // for .next()

#[tokio::main]
async fn main() {
    let spawn_time = Instant::now();
    let mut sub_tasks = futures::stream::FuturesUnordered::new();
    let (tx, mut rx) = tokio::sync::mpsc::channel(1000);
    // Spawn & send
    for _ in 0..1_000_000 {
        let mut sender = tx.clone();
        let handle = tokio::spawn(async move {
            sender.send(0).await.unwrap();
        });
        sub_tasks.push(handle);
    }
    // Receive
    for _ in 0..1_000_000 {
        let _ = rx.recv().await;
    }
    println!(
        "Task spawned and received message: {}ms",
        spawn_time.elapsed().as_millis()
    );

    let join_all_time = Instant::now();

    while let Some(item) = sub_tasks.next().await {
        let () = item.unwrap();
    }
    println!(
        "Joining all handles: {}ms",
        join_all_time.elapsed().as_millis()
    );
}

Using FuturesUnordered gives me similar performance:

Task spawned and received message: 1019ms
Joining all handles: 160ms

I'm closing this because the issue lies in the futures crate, and not in Tokio. You should however feel free to post additional questions below.

@bkolobara
Copy link
Author

Thank you Darksonn! This was a very helpful.

@Darksonn Darksonn added A-tokio Area: The main tokio crate C-question User questions that are neither feature requests nor bug reports M-task Module: tokio/task labels Apr 20, 2020
nbari added a commit to s3m/sandbox that referenced this issue Aug 8, 2020
20k-ultra added a commit to 20k-ultra/popsicle that referenced this issue Feb 27, 2023
20k-ultra added a commit to 20k-ultra/popsicle that referenced this issue Feb 27, 2023
20k-ultra added a commit to 20k-ultra/popsicle that referenced this issue Feb 27, 2023
20k-ultra added a commit to 20k-ultra/popsicle that referenced this issue Feb 27, 2023
20k-ultra added a commit to 20k-ultra/popsicle that referenced this issue Feb 27, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
A-tokio Area: The main tokio crate C-question User questions that are neither feature requests nor bug reports M-task Module: tokio/task
Projects
None yet
Development

No branches or pull requests

2 participants