Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add taskset api #4238

Closed
wants to merge 5 commits into from
Closed

add taskset api #4238

wants to merge 5 commits into from

Conversation

Noah-Kennedy
Copy link
Contributor

@Noah-Kennedy Noah-Kennedy commented Nov 16, 2021

Closes #3903

This API simplifies managing the lifetimes of sets of tasks, allowing for joining in the style of FuturesUnordered or mass-cancellation.
@Noah-Kennedy Noah-Kennedy added A-tokio-stream Area: The tokio-stream crate A-tokio-util Area: The tokio-util crate labels Nov 16, 2021
@@ -43,6 +44,7 @@ futures-core = "0.3.0"
futures-sink = "0.3.0"
futures-io = { version = "0.3.0", optional = true }
futures-util = { version = "0.3.0", optional = true }
futures = { version = "0.3.0", optional = true }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like you're only using this for futures::future::pending() in doc-tests, but you can just use std::future::pending() for those.

/// assert_eq!(NUMS, joined.as_slice());
/// # });
/// ```
pub async fn join_all(mut self) -> Vec<Result<T, JoinError>> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not a big fan of this return type - you don't get any #[must_use] warnings if you ignore the return value. Though I suppose you could put #[must_use] on the method itself.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Though I suppose you could put #[must_use] on the method itself.

@Darksonn Currently, this might not work as expected on async fn (rust-lang/rust#78149). A lint was added to rustc recently (playground), at least until #[must_use] is changed to apply to the future's output value instead of the impl Future returned by the function.

warning: `must_use` attribute on `async` functions applies to the anonymous `Future` returned by the function, not the value within

Comment on lines 115 to 117
let (t, idx, _tasks) = futures_util::future::select_all(self.tasks.iter_mut()).await;

self.tasks.remove(idx);
Copy link
Contributor

@Darksonn Darksonn Nov 16, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This has an O(n) cost per call, and that's too expensive. It would be a repeat of #2401 if anyone calls this in a loop. The same applies to the poll function (it's worse there).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I scraped both together fairly quickly so that we could look at the API. I'll rewrite the internals of both.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pushed up a new one that isn't as atrociously awful, but getting this to not be O(n) will require a different underlying data structure. I'll look into this more tonight.

/// # });
/// ```
pub async fn join_all(mut self) -> Vec<Result<T, JoinError>> {
let output = futures_util::future::join_all(self.tasks.iter_mut()).await;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should just be a simple loop awaiting each join handle in order.

Copy link
Contributor

@Darksonn Darksonn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the status here? Do you need anything from me?

Comment on lines 88 to 92
for task in self.tasks.iter_mut() {
output.push(task.await)
}

output
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like it should be ok to add an self.tasks.clear() after the loop so the destructor doesn't have to abort everything again.

Copy link
Contributor Author

@Noah-Kennedy Noah-Kennedy Nov 25, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the catch, done.

clear taskset after joinall, so drop doesn't try and abort the completed tasks
@Noah-Kennedy
Copy link
Contributor Author

@Darksonn what do we want to do about the O(n) next_finished?

@Darksonn
Copy link
Contributor

You're probably going to need some sort of custom waker to get around it.

@Noah-Kennedy
Copy link
Contributor Author

@Darksonn FuturesUnordered uses several linked lists (I believe one of which is intrusive) to manage the stored tasks. Implementing a similar data structure is one option, although I don't think that implementing such a complicated set of datastructures for a single API is worthwhile.

If we put this into tokio instead of tokio-util, would it be possible for us to reuse some of the internal APIs to simplify the implementation?

@Darksonn
Copy link
Contributor

Well, I'm pretty sure we could do it in a simpler way than what FuturesUnordered does. Anyway, putting it in Tokio itself is an option since you can then put the data in the task object itself.

@Darksonn
Copy link
Contributor

Darksonn commented Feb 8, 2022

Closing since this was implemented in #4335 instead.

@Darksonn Darksonn closed this Feb 8, 2022
@Darksonn Darksonn deleted the noah/taskset-api branch February 8, 2022 09:00
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
A-tokio-stream Area: The tokio-stream crate A-tokio-util Area: The tokio-util crate
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Add a TaskSet
3 participants