-
Notifications
You must be signed in to change notification settings - Fork 7.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Need inclusive predicate variant of takeUntil/takeWhile #1649
Comments
Good question. I'd like to find out from @headinthebox what the canonical solution is to this as Haskell, various Rx flavors and others either must have or need this solution as well. The |
/cc @spodila who also needs a solution to this |
Nice. I could use that. Thank you. |
We cannot add every possible operator that people need in the library. Instead we must make sure you can achieve waht you want by composing a set of orthogonal primitive operators. In this case the solution is as follows: object MainScala {
def main(args: Array[String]): Unit = {
val xs = Observable.items(0,1,2,3,4)
val ys = xs.takeWhile(x => x < 3)
ys.subscribe(y => println(y)) // 0,1,2
val zs = xs.publish[Int]((xs: Observable[Int]) => xs.takeUntil(xs.dropWhile(x => x < 3)))
zs.subscribe(z => println(z)) // 0,1,2,3
readLine()
}
} |
Maybe I am missing something... |
@headinthebox Is there a guarantee that your example will work in every case? I may be missing something, but where in the documentation is it described that the In your example, couldn't |
dropWhile is here: https://github.com/ReactiveX/RxScala/blob/0.x/src/main/scala/rx/lang/scala/Observable.scala#L1981 |
@headinthebox Your example with RxJava 0.20.4 gives to me "0 1 2 0 1 2", not "0 1 2 0 1 2 3" as shown in your comments. Your code seems to be the strict equivalent to The proposed |
I'm without a proper computer right now, but did you run exactly the same snippet using RxScala? |
Yup, I only added the import: import rx.lang.scala._
object MainScala {
def main(args: Array[String]): Unit = {
val xs = Observable.items(0, 1, 2, 3, 4)
val ys = xs.takeWhile(x => x < 3)
ys.subscribe(y => println(y)) // 0,1,2
val zs = xs.publish[Int]((xs: Observable[Int]) => xs.takeUntil(xs.dropWhile(x => x < 3)))
zs.subscribe(z => println(z)) // 0,1,2,3
readLine()
} build.sbt:
And run:
But from what I read, this is expected, as |
@headinthebox Any news on this one? |
Was blocked because of broken scala build. |
Yup, the import rx.lang.scala._
object MainScala {
def main(args: Array[String]): Unit = {
val xs = Observable.items(0, 1, 2, 3, 4)
val zs = xs.publish[Int]((xs: Observable[Int]) => xs.takeUntil(xs.dropWhile(x => x < 3).tail))
zs.subscribe(z => println(z)) // 0,1,2,3
readLine()
} |
Ok. But I can live without it (and do at this time), but I still think RxJava would be better with those inclusive versions. |
The point of Rx is to be composable. We cannot anticipate every single operator that people want to use (_). There are already _way too many* operators in the library, so the default answer is "no" until we seen a pattern appearing in a majority of code (and conversely, we should deprecate operators that can be defined in terms of other and are seldomly used). |
So let's start with a count of 1 for |
One problem with using publish is it does break backpressure as it introduces multicasting. It is a fine solution for hot streams but not desirable for cold ones. I use it for example on a buffered denounce, but backpressure is not relevant in that case. Take operators on the other hand should be usable without breaking backpressure. |
Putting on 1.0.x as this is not a blocker for 1.0 but I want to continue considering and discussing this. Also opened #1732 related to this. |
What about The issue is that @headinthebox and I are good for adding this as it is a common need, but we're struggling with naming. The API would look like this:
What better names (starting with |
What about observable.terminateAfter(_ > 3).subscribe(…) does not look that strange to me. |
|
So what are you proposing? takeUntil is also confusing. |
I am proposing |
And the |
|
There is no "exclusive" Here is usage: [start, a, b, c, stop, d, e].takeUntilXXX(x -> x == stop)
// return [start, a, b, c, stop]
[1, 2, 3, 4, 5].takeUntilXXX(x -> x == 3)
// return [1, 2, 3]
[1, 2, 3, 4, 5].takeUntilXXX(x -> x > 3)
// return [1, 2, 3, 4] |
Going back to the takeUntil(Observable) // unsubscribe when observable emits
takeUntil(T -> Boolean) // unsubscribe when predicate returns true (inclusive)
takeWhile(T -> Boolean) // unsubscribe when predicate returns false (exclusive) I'm concerned with the arbitrary difference of inclusive/exclusive and similarity of signatures. The "until" suffix feels like it could be either inclusive or exclusive. |
How about |
Note that this operator is not present in any collection library I know of, hence we cannot steal an existing name. I guess Haskell developers would use |
I hit another use case today where I'd really like to have this. |
An experimental |
One way to distinguish the two versions would be |
I second the idea of needing this operator. I'd argue this is a basic primitive stream operation that ought not to be composed. Not including it is like Java designers arguing that you don't need a <= operator on integers because you already have < and == operators. |
@mxklabs how urgently do you need this operator? |
@akarnokd I need it now but can probably work around it. I just wanted to agree with the sentiment that it makes sense to have this operator. |
@akarnokd Also, take until (from what I understand from here) is not exactly what's requested. It takes two observables and terminates as soon as the second observable emits an item as opposed to an observable and a condition and keep emitting upto and including the first emit where the condition holds. (Not sure if I am looking at the right documentation, sorry) |
It has been two times I've been missing a
takeUntil(filter)
operator on observables. The last time was in cgeo while implementing a low-power mode: I want to receive location updates through an observable and stop as soon as a location arrives with a precision of 20m or less, but I want this location to be returned. UsingtakeWhile(!filter)
would drop the matching item.Of course, this can be worked around, but I was surprised not to get this operator which is the dual of
takeWhile
. Would you consider a contribution adding it?The text was updated successfully, but these errors were encountered: