Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[API Proposal]: Making "Process asynchronous tasks as they complete" easy by using IAsyncEnumerable #61959

Closed
Vijay-Nirmal opened this issue Nov 23, 2021 · 12 comments · Fixed by #100316
Assignees
Labels
api-approved API was approved in API review, it can be implemented area-System.Threading.Tasks in-pr There is an active PR which will close this issue when it is merged
Milestone

Comments

@Vijay-Nirmal
Copy link

Vijay-Nirmal commented Nov 23, 2021

EDITED on 1/23/2024 by @stephentoub:

namespace System.Threading.Tasks;

public class Task
{
+   public static IAsyncEnumerable<Task> WhenEach(params Task[] tasks);
+   public static IAsyncEnumerable<Task> WhenEach(params ReadOnlySpan<Task> tasks); // params when possible
+   public static IAsyncEnumerable<Task> WhenEach(IEnumerable<Task> tasks);

+   public static IAsyncEnumerable<Task<TResult>> WhenEach(params Task<TResult>[] tasks); // params for now; move it to ReadOnlySpan overload when that syntax is possible
+   public static IAsyncEnumerable<Task<TResult>> WhenEach(params ReadOnlySpan<Task<TResult>> tasks); // params when possible
+   public static IAsyncEnumerable<Task<TResult>> WhenEach(IEnumerable<Task<TResult>> tasks);
}

Background and motivation

Currently, if we need to "Process asynchronous tasks as they complete" then we need to write lots of unnecessary codes and its not straight forward, something like below.

// Using currently available APIs
List<Task<int>> downloadTasks = downloadTasksQuery.ToList();
while (downloadTasks.Any())
{
    Task<int> finishedTask = await Task.WhenAny(downloadTasks);
    downloadTasks.Remove(finishedTask);
    Process(await finishedTask);
}

API Proposal

namespace System.Threading.Tasks
{
    public class Task : IAsyncResult, IDisposable
    {
        public static IAsyncEnumerable<Task> WhenEach(params Task[] tasks);  // Please change the name, if needed
        public static IAsyncEnumerable<Task> WhenEach(IEnumerable<Task> tasks);
        public static IAsyncEnumerable<Task<TResult>> WhenEach(params Task<TResult>[] tasks); 
        public static IAsyncEnumerable<Task<TResult>> WhenEach(IEnumerable<Task<TResult>> tasks);
    }
}

API Usage

// Using newly created APIs
await foreach (var finishedTask in Task.WhenEach(downloadTasksQuery))
{
    Process(await finishedTask);
}

Alternative Designs

No response

Risks

No response

Updates

(Others can edit this section and add more info)

@Vijay-Nirmal Vijay-Nirmal added the api-suggestion Early API idea and discussion, it is NOT ready for implementation label Nov 23, 2021
@dotnet-issue-labeler dotnet-issue-labeler bot added area-System.Threading.Tasks untriaged New issue has not been triaged by the area owner labels Nov 23, 2021
@ghost
Copy link

ghost commented Nov 23, 2021

Tagging subscribers to this area: @dotnet/area-system-threading-tasks
See info in area-owners.md if you want to be subscribed.

Issue Details

Background and motivation

Currently, if we need to "Process asynchronous tasks as they complete" then we need to write lots of unnecessary codes and its not straight forward, something like below.

// Using currently available APIs
List<Task<int>> downloadTasks = downloadTasksQuery.ToList();
while (downloadTasks.Any())
{
    Task<int> finishedTask = await Task.WhenAny(downloadTasks);
    downloadTasks.Remove(finishedTask);
    Process(await finishedTask);
}

API Proposal

namespace System.Threading.Tasks
{
    public class Task : IAsyncResult, IDisposable
    {
        public static IAsyncEnumerable<Task> WhenAnyAsEnumerable(params Task[] tasks);  // Please change the name, this is not a good name for this method :)
        public static IAsyncEnumerable<Task> WhenAnyAsEnumerable(IEnumerable<Task> tasks);
        public static IAsyncEnumerable<Task<TResult>> WhenAnyAsEnumerable(params Task<TResult>[] tasks); 
        public static IAsyncEnumerable<Task<TResult>> WhenAnyAsEnumerable(IEnumerable<Task<TResult>> tasks);
    }
}

API Usage

// Using new available APIs
await foreach (var finishedTask in Task.WhenAnyAsEnumerable(downloadTasksQuery))
{
    Process(await finishedTask);
}

Alternative Designs

No response

Risks

No response

Author: Vijay-Nirmal
Assignees: -
Labels:

api-suggestion, area-System.Threading.Tasks, untriaged

Milestone: -

@svick
Copy link
Contributor

svick commented Nov 23, 2021

This method already exists in AsyncEx, where it's called OrderByCompletion. It returns a (non-async) collection of wrapper tasks, but I suspect that's because it's older than IAsyncEnumerable<T> and using IAsyncEnumerable<T> returning the original tasks is the better approach today.

@buyaa-n buyaa-n added this to the Future milestone Nov 24, 2021
@buyaa-n buyaa-n added needs-further-triage Issue has been initially triaged, but needs deeper consideration or reconsideration and removed untriaged New issue has not been triaged by the area owner labels Nov 24, 2021
@theodorzoulias
Copy link
Contributor

@Vijay-Nirmal a name that might be more suitable for the Task.WhenAnyAsEnumerable method is Task.WhenEach. :-)

@sakno
Copy link
Contributor

sakno commented Dec 10, 2021

You can achieve the same behavior with TaskCompletionPipe that has native support of IAsyncEnumerable<T> as well as channel-like methods WaitToReadAsync and TryRead.

@stephentoub stephentoub modified the milestones: Future, 9.0.0 Jan 23, 2024
@stephentoub stephentoub self-assigned this Jan 23, 2024
@stephentoub stephentoub added api-ready-for-review API is ready for review, it is NOT ready for implementation and removed needs-further-triage Issue has been initially triaged, but needs deeper consideration or reconsideration api-suggestion Early API idea and discussion, it is NOT ready for implementation labels Jan 23, 2024
@bartonjs
Copy link
Member

bartonjs commented Mar 26, 2024

Video

Looks good as proposed

namespace System.Threading.Tasks;

public class Task
{
    public static IAsyncEnumerable<Task> WhenEach(params Task[] tasks);
    public static IAsyncEnumerable<Task> WhenEach(params ReadOnlySpan<Task> tasks);
    public static IAsyncEnumerable<Task> WhenEach(IEnumerable<Task> tasks);

    public static IAsyncEnumerable<Task<TResult>> WhenEach(params Task<TResult>[] tasks);
    public static IAsyncEnumerable<Task<TResult>> WhenEach(params ReadOnlySpan<Task<TResult>> tasks);
    public static IAsyncEnumerable<Task<TResult>> WhenEach(IEnumerable<Task<TResult>> tasks);
}

@bartonjs bartonjs added api-approved API was approved in API review, it can be implemented and removed api-ready-for-review API is ready for review, it is NOT ready for implementation labels Mar 26, 2024
@dotnet-policy-service dotnet-policy-service bot added the in-pr There is an active PR which will close this issue when it is merged label Mar 26, 2024
@dahlbyk
Copy link
Contributor

dahlbyk commented Mar 31, 2024

Nit: missing <TResult>:

+   public static IAsyncEnumerable<Task<TResult>> WhenEach<TResult>(params Task<TResult>[] tasks); // params for now; move it to ReadOnlySpan overload when that syntax is possible
+   public static IAsyncEnumerable<Task<TResult>> WhenEach<TResult>(params ReadOnlySpan<Task<TResult>> tasks); // params when possible
+   public static IAsyncEnumerable<Task<TResult>> WhenEach<TResult>(IEnumerable<Task<TResult>> tasks);

@alrz
Copy link
Member

alrz commented Apr 1, 2024

@svick

using IAsyncEnumerable<T> returning the original tasks is the better approach today.

I wonder what is the benefit here. Wouldn't IEnumerable be cheaper?

At first look I thought this is going to return the value instead of the task because we're awaiting that one on MoveNextAsync.

public static IAsyncEnumerable<TResult> WhenEach<TResult>(IEnumerable<Task<TResult>> tasks)

@stephentoub
Copy link
Member

I wonder what is the benefit here. Wouldn't IEnumerable be cheaper?

You'd block synchronously in MoveNext waiting for the next task to complete.

At first look I thought this is going to return the value instead of the task because we're awaiting that one on MoveNextAsync.

It's returning the completed task, just like with WhenAny. That gives the consumer the ability to examine / use the completed Task however they like.

@alrz
Copy link
Member

alrz commented Apr 1, 2024

@stephentoub

You'd block synchronously in MoveNext waiting for the next task to complete.

If I'm not mistaken, OrderByCompletion returns a IEnumerable<Task<T>> which you're indeed be awaiting to examine / use however you'd like. Note there's no blocking -- MoveNext returns immediately since we're enumerating an array.

It's returning the completed task, just like with WhenAny. That gives the consumer the ability to examine / use the completed Task however they like.

So what I'm saying is that IEnumerable<Task<T>> will give you exactly that, IAsyncEnumerable seems like an auxiliary helper when you want to stop on first exception.

IAsyncEnumerable<T> WhenEach(Task<T>[] tasks) {
  foreach (Task<T> task in tasks.OrderByCompletion())
    yield return await task;
  }
}      

@stephentoub
Copy link
Member

stephentoub commented Apr 1, 2024

@alrz, how would you propose for MoveNext to immediately complete such that Current could return the next Task even when no next task had yet completed? The only way to do that would be to allocate a new Task that could be returned immediately, with a different object identity than the original task, and then whenever the next task completes, marshal its results/exception/cancellation information to that proxy. At that point, you've significantly increased the cost and you've lost the ability to compare Tasks by reference.

I'm not understanding the aversion to using IAsyncEnumerable here.

@alrz
Copy link
Member

alrz commented Apr 1, 2024

@stephentoub
Copy link
Member

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
api-approved API was approved in API review, it can be implemented area-System.Threading.Tasks in-pr There is an active PR which will close this issue when it is merged
Projects
None yet
Development

Successfully merging a pull request may close this issue.

9 participants