Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

select! without FusedFuture #1989

Open
Matthias247 opened this issue Nov 25, 2019 · 6 comments
Open

select! without FusedFuture #1989

Matthias247 opened this issue Nov 25, 2019 · 6 comments
Labels
A-future Area: futures::future A-macro Area: macro related C-feature-request S-needs-implementation Status: Implementation work is needed.

Comments

@Matthias247
Copy link
Contributor

Matthias247 commented Nov 25, 2019

select! is probably my favorite API in the whole Rust async story. That is because it enables use-cases that are not possible in normal threaded code: We can wait on arbitrary operations to complete concurrently (not only on channels) - and depending on the result and current state even cancel some of the branches.

I think the most recent changes which allowed to select! on non-pinned Futures made this operations already a fair bit more accessible for most users, since they now do not need to deal with pinning in most cases.

I'm now thinking about whether we could not still improve in another area: Fusing. The requirement that branches need to implement FusedFuture imposes 2 downsides:

  • Users need to discover what that FusedFuture thing actually is, and add .fuse() to most of their statements (since they might be coming from an async fn).
  • Types which want to implement FusedFuture directly (since it's more correct and more ergonomic) must take a dependency on future-core. Which some libraries that have a zero-dependency policy or which are unhappy that futures-rs is still not 1.0 want to avoid.

I am not contemplating whether the use-cases for which FusedFuture in select! is required could not be also supported in another fashion - which has different trade-offs.

First of all regarding the use-cases: I think the main use-case for fusing is to selectively disable branches inside select!, which prevents busy-looping. It is in that sense similar to the Go pattern of selecting on nil channels, which disables a select branch. I think the select! macro could support selective branches in a similar fashion: By enabling or disabling individual branches of a select! by user function calls (instead of automatically disabling them based on previous observation that the branch terminated).

In order to enable/disable branches users would need to store the Stream or Future for these branches in a certain Switch/Option type which is known to the macro

Translating the Go example from the Link, it could look along

asnyc fn merge(a_stream: Receiver<i32>, b_stream: Receiver<i32>) -> Receiver<i32> {
    let (sender, receiver) = channel();
    spawn(async {
        // Wrap the types which we are interesting to disable during select in something which
        // offers a disable function. The name is bad. Open for anything.
        let a_stream = SelectDisabler::new(a_stream);
        let b_stream = SelectDisabler::new(b_stream);
        while a.is_enabled() && b.is_enabled() {
            select! {
                // This block is only executed if `a` is still enabled. Since we need to know
                // whether something supports enabling/disabling at compile time (unless we
                // have specialiation), this requires a custom macro syntax like `if_enabled` here.
                // In the future that could potentially by dropped
                a_result = if_enabled(a_stream) => {
                    match a_result {
                        Some(v) => sender.send(v).await,
                        None => {
                            // Disable the branch
                            a.disable();
                        }
                    }
                }
                b_result = if_enabled(b_stream) => {
                    match b_result {
                        Some(v) => sender.send(v).await,
                        None => {
                            // Disable the branch
                            b.disable();
                        }
                    }
                }
            }
        }
    });
    receiver
}

With such an API the terminated branch obviously only makes sense if all branches in the select! support disabling.

Benefits:

  • No need for .fuse() anymore in common select! use-cases. Directly selecting on Futures and Streams would just work, unless people need selective disabling (which the minority probably do).
  • Libraries would no longer be required to implement FusedFuture and could potentially avoid being dependent on futures-core for select compatibility.
  • Users can also disable branches even when they have not yet run to completion. They could even disable and reenable them. E.g. to implement throttling.

Drawbacks:

  • Adding support for explicit enabling/disabling is not a breaking API change. However not respecting the FusedFuture information anymore is one. And thereby software which relied on it would no longer work the same. I'm however not really sure if a lot of that is out there.
  • This requires users to disable branches correctly in order to avoid endless loops instead of the Future/Stream implementations doing it automatically.

WDYT @cramertj , @Nemo157 , @taiki-e

@Matthias247
Copy link
Contributor Author

Just figured out Streams are currently supported via generating a Future from them via .next and not directly. Which means the example that I provided doesn't directly work that way.

But I think selecting on a_result = if_enabled(a_stream.next()) => { could work, if we simply invoke the expression inside if_enabled lazily only in the cases where the selection is still active. Actually that's even nicer, since it also avoids the dependency on any specific Stream type definition.

@Matthias247
Copy link
Contributor Author

Matthias247 commented Nov 25, 2019

Just another syntax idea to make sure arbitrary expressions that resolve to a Future can be awaited and select ing on them can be disabled:

Instead of passing the Stream to the disabler this just gets a plain struct. And instead disabling is performed in a fashion that is similar to if clauses on match expressions:

asnyc fn merge(a_stream: Receiver<i32>, b_stream: Receiver<i32>) -> Receiver<i32> {
    let (sender, receiver) = channel();
    spawn(async {
        let a_disabler = SelectDisabler::new();
        let b_disabler = SelectDisabler::new();

        while a.is_enabled() && b.is_enabled() {
            select! {
                a_result = a_stream.next() if_enabled(a_disabler) => {
                    match a_result {
                        Some(v) => sender.send(v).await,
                        None => {
                            // Disable the branch
                            a_disabler.disable();
                        }
                    }
                }
                b_result = b_stream.next() if_enabled(b_disabler) => {
                    match b_result {
                        Some(v) => sender.send(v).await,
                        None => {
                            // Disable the branch
                            b_disabler.disable();
                        }
                    }
                }
            }
        }
    });
    receiver
}

@tekjar
Copy link

tekjar commented Apr 15, 2020

https://github.com/tekjar/rumq/blob/master/rumq-client/src/eventloop.rs#L169-L172

Conditional branch polling helped me in implementing a very readable flow control based on current inflight queue size. Any plans to bring this feature to futures select! ?

@cramertj
Copy link
Member

@tekjar I'd be happy to accept a PR! I don't personally have the bandwidth to implement it myself at the moment, sorry.

@bugaevc
Copy link

bugaevc commented Oct 18, 2021

a_result = a_stream.next() if_enabled(a_disabler)

An even simpler alternative would be to allow selecting on an Option<Future>. To disable an arm, set it to None. This way it's expressed in the type system & you can drop futures early, before other arms complete.

Please also note that selecting on stream.next() is a very problematic thing to do, as it makes a fresh new .next() call on each iteration and then drops (cancels) the future if any other arm is taken. It's probably fine for the case of receiving from a channel; but should be discouraged in general.

@Nemo157
Copy link
Member

Nemo157 commented Oct 18, 2021

Please also note that selecting on stream.next() is a very problematic thing to do, as it makes a fresh new .next() call on each iteration and then drops (cancels) the future if any other arm is taken. It's probably fine for the case of receiving from a channel; but should be discouraged in general.

With futures::stream::Stream and the currently unstable std::stream::Stream that is invisible to the stream, there is no possible state that can tell whether it is the same or a new instance of Next that is polling it. Only if we get something like the AsyncIter trait discussed in https://internals.rust-lang.org/t/blog-series-dyn-async-in-traits/15449 would this matter.

@taiki-e taiki-e removed their assignment Jun 18, 2023
@taiki-e taiki-e added the S-needs-implementation Status: Implementation work is needed. label Jul 19, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
A-future Area: futures::future A-macro Area: macro related C-feature-request S-needs-implementation Status: Implementation work is needed.
Projects
None yet
Development

No branches or pull requests

6 participants