-
Notifications
You must be signed in to change notification settings - Fork 7.6k
Filtering Observables
This section explains operators you can use to filter and select items emitted by Observables.
-
filter( )
— filter items emitted by an Observable -
takeLast( )
— only emit the last n items emitted by an Observable -
last( )
— emit only the last item emitted by an Observable -
lastOrDefault( )
— emit only the last item emitted by an Observable, or a default value if the source Observable is empty -
takeLastBuffer( )
— emit the last n items emitted by an Observable, as a single list item -
skip( )
— ignore the first n items emitted by an Observable -
skipLast( )
— ignore the last n items emitted by an Observable -
take( )
— emit only the first n items emitted by an Observable -
first( )
andtakeFirst( )
— emit only the first item emitted by an Observable, or the first item that meets some condition -
firstOrDefault( )
— emit only the first item emitted by an Observable, or the first item that meets some condition, or a default value if the source Observable is empty -
elementAt( )
— emit item n emitted by the source Observable -
elementAtOrDefault( )
— emit item n emitted by the source Observable, or a default item if the source Observable emits fewer than n items -
sample( )
orthrottleLast( )
— emit the most recent items emitted by an Observable within periodic time intervals -
throttleFirst( )
— emit the first items emitted by an Observable within periodic time intervals -
throttleWithTimeout( )
ordebounce( )
— only emit an item from the source Observable after a particular timespan has passed without the Observable emitting any other items -
timeout( )
— emit items from a source Observable, but issue an exception if no item is emitted in a specified timespan -
distinct( )
— suppress duplicate items emitted by the source Observable -
distinctUntilChanged( )
— suppress duplicate consecutive items emitted by the source Observable -
ofType( )
— emit only those items from the source Observable that are of a particular class -
ignoreElements( )
— discard the items emitted by the source Observable and only pass through the error or completed notification
You can filter an Observable, discarding any items that do not meet some test, by passing a filtering function into the filter( )
method. For example, the following code filters a list of integers, emitting only those that are even (that is, where the remainder from dividing the number by two is zero):
numbers = Observable.from([1, 2, 3, 4, 5, 6, 7, 8, 9]);
numbers.filter({ 0 == (it % 2) }).subscribe(
{ println(it); }, // onNext
{ println("Error: " + it.getMessage()); }, // onError
{ println("Sequence complete"); } // onCompleted
);
2
4
6
8
Sequence complete
- javadoc:
filter(predicate)
- RxMarbles interactive marble diagram
- Linq:
Where
- RxJS:
filter
- Introduction to Rx: Where
To convert an Observable that emits several items into one that only emits the last n of these items before completing, use the takeLast( )
method. For instance, in the following code, takeLast( )
emits only the last integer in the list of integers represented by numbers
:
numbers = Observable.from([1, 2, 3, 4, 5, 6, 7, 8, 9]);
numbers.takeLast(1).subscribe(
{ println(it); }, // onNext
{ println("Error: " + it.getMessage()); }, // onError
{ println("Sequence complete"); } // onCompleted
);
9
Sequence complete
There are also versions of takeLast( )
that emit the items that were emitted by the source Observable during a specified window of time before the Observable completed, or that emit a maximum of n items from such a window.
- javadoc:
takeLast(count)
- javadoc:
takeLast(time, unit)
andtakeLast(time, unit, scheduler)
- javadoc:
takeLast(count, time, unit)
andtakeLast(count, time, unit, scheduler)
- RxMarbles interactive marble diagram
- Linq:
TakeLast
- RxJS:
takeLast
andtakeLastWithTime
- Introduction to Rx: Last
The last( )
operator is equivalent to takeLast(1)
except that it will throw an NoSuchElementException
if the source Observable does not emit at least one item. Note that there is also a BlockingObservable
implementation of last( )
.
Note: in the scala language adaptor for RxJava, this method is called
takeRight( )
.
- Table of similar blocking and non-blocking operators
- javadoc:
last( )
andlast(predicate)
- RxMarbles interactive marble diagram
- RxJS:
last
- Linq:
lastAsync
emit only the last item emitted by an Observable, or a default value if the source Observable is empty
The lastOrDefault( )
operator returns an Observable that emits the last item emitted by the source Observable, or a default item if the source Observable does not emit at least one item. Note that there is also a BlockingObservable
implementation of lastOrDefault( )
.
- Table of similar blocking and non-blocking operators
- javadoc:
lastOrDefault(default)
andlastOrDefault(default, predicate)
- RxJS:
lastOrDefault
- Linq:
lastOrDefaultAsync
To convert an Observable that emits several items into one that emits the last n of these items as a single list before completing, use the takeLastBuffer( )
method. There are also versions of takeLastBuffer( )
that emit a list containing the items that were emitted by the source Observable during a specified window of time before the Observable completed, or a maximum of n items from such a window.
def numbers = Observable.from([1, 2, 3, 4, 5, 6, 7, 8]);
numbers.takeLastBuffer(3).subscribe(
{ println(it); }, // onNext
{ println("Error: " + it.getMessage()); }, // onError
{ println("Sequence complete"); } // onCompleted
);
[6, 7, 8]
Sequence complete
- javadoc:
takeLastBuffer(count)
- javadoc:
takeLastBuffer(time, timeunit)
andtakeLastBuffer(time, timeunit, scheduler)
- javadoc:
takeLastBuffer(count, time, timeunit)
andtakeLastBuffer(count, time, timeunit, scheduler)
- RxJS:
takeLastBuffer
andtakeLastBufferWithTime
You can ignore the first n items emitted by an Observable and attend only to those items that come after, by modifying the Observable with the skip(n)
method.
numbers = Observable.from([1, 2, 3, 4, 5, 6, 7, 8, 9]);
numbers.skip(3).subscribe(
{ println(it); }, // onNext
{ println("Error: " + it.getMessage()); }, // onError
{ println("Sequence complete"); } // onCompleted
);
4
5
6
7
8
9
Sequence complete
There are also versions of skip()
that ignore the items emitted by an Observable during a specified period of time after the Observable is subscribed to.
Note: in the scala language adaptor for RxJava, this method is called
drop( )
.
- javadoc:
skip(num)
- javadoc:
skip(time, timeunit)
andskip(time, timeunit, scheduler)
- RxMarbles interactive marble diagram
- Linq:
Skip
- RxJS:
skip
- Introduction to Rx: Skip and Take
You can ignore the last n items emitted by an Observable and attend only to those items that preced them, by modifying the Observable with the skipLast(n)
method. Note that the mechanism by which this is implemented will delay the emission of any item from the source Observable until n additional items have been emitted by that Observable.
There are also versions of skipLast()
that ignore the items emitted by an Observable during a specified period of time before the Observable completes.
Note: in the scala language adaptor for RxJava, this method is called
dropRight( )
.
- javadoc:
skipLast(count)
- javadoc:
skipLast(time, timeunit)
andskipLast(time, timeunit, scheduler)
- Linq:
SkipLast
- RxJS:
skipLast
- Introduction to Rx: SkipLast and TakeLast
You can choose to pay attention only to the first n items emitted by an Observable by calling its take(n)
method. That method returns an Observable that will invoke an Subscriber’s onNext
method a maximum of n times before invoking onCompleted
. For example,
numbers = Observable.from([1, 2, 3, 4, 5, 6, 7, 8, 9]);
numbers.take(3).subscribe(
{ println(it); }, // onNext
{ println("Error: " + it.getMessage()); }, // onError
{ println("Sequence complete"); } // onCompleted
);
1
2
3
Sequence complete
If you call take(n)
on an Observable, and that Observable emits fewer than n items before completing, the new, take
-modified Observable will not throw an exception or invoke onError()
, but will merely emit this same fewer number of items before it completes.
- javadoc:
take(num)
- javadoc:
take(time, timeunit)
andtake(time, timeunit, scheduler)
- RxMarbles interactive marble diagram
- Linq:
Take
- RxJS:
take
- Introduction to Rx: Skip and Take
To create an Observable that emits only the first item emitted by a source Observable (if any), use the first( )
method.
You can also pass a function to this method that evaluates items as they are emitted by the source Observable, in which case first( )
will create an Observable that emits the first such item for which your function returns true
(if any).
takeFirst( )
behaves very similarly to first( )
with the exception of how they behave when the source Observable emits no items (or no items that match the predicate). In such a case, first( )
will throw an NoSuchElementException
while takeFirst( )
will return an empty Observable (one that calls onCompleted( )
but never calls onNext( )
).
- Table of similar blocking and non-blocking operators
- javadoc:
first()
- javadoc:
first(predicate)
- javadoc:
takeFirst(predicate)
- RxMarbles interactive marble diagram
- Linq:
firstAsync
- RxJS:
first
- Introduction to Rx: First
emit only the first item emitted by an Observable, or the first item that meets some condition, or a default value if the source Observable is empty
To create an Observable that emits only the first item emitted by a source Observable (or a default value if the source Observable is empty), use the firstOrDefault( )
method.
You can also pass a function to this method that evaluates items as they are emitted by the source Observable, in which case firstOrDefault( )
will create an Observable that emits the first such item for which your function returns true
(or the supplied default value if no such item is emitted).
Note: in the scala language adaptor for RxJava, this method is called
firstOrElse( )
orheadOrElse( )
.
- Table of similar blocking and non-blocking operators
- javadoc:
firstOrDefault(default)
- javadoc:
firstOrDefault(default, predicate)
- Linq:
firstOrDefaultAsync
- RxJS:
firstOrDefault
- Introduction to Rx: First
Pass elementAt( )
a zero-based index value and it will emit the solitary item from the source Observable's sequence that matches that index value (for example, if you pass the index value 5, elementAt( )
will emit the sixth item emitted by the source Observable). If you pass in a negative index value, or if the source Observable emits fewer than index value + 1 items, elementAt( )
will throw an IndexOutOfBoundsException
.
- javadoc:
elementAt(index)
- RxMarbles interactive marble diagram
- Linq:
ElementAt
- RxJS:
elementAt
- Introduction to Rx: ElementAt
emit item n emitted by the source Observable, or a default item if the source Observable emits fewer than n items
Pass elementAtOrDefault( )
a zero-based index value and it will emit the solitary item from the source Observable's sequence that matches that index value (for example, if you pass the index value 5, elementAtOrDefault( )
will emit the sixth item emitted by the source Observable). If you pass in a negative index value, elementAtOrDefault( )
will throw an IndexOutOfBoundsException
. If the source Observable emits fewer than index value + 1 items, elementAtOrDefault( )
will emit the default value you pass in (you must also pass in a type for this value that is appropriate to what type your Subscribers expect to observe).
- javadoc:
elementAtOrDefault(index,default)
- Linq:
ElementAtOrDefault
- RxJS:
elementAtOrDefault
Use the sample( )
method to periodically look at an Observable to see what item it has most recently emitted since the previous sampling. Note that if the source Observable has emitted no items since the last time it was sampled, the Observable that results from the sample( )
operator will emit no item for that sampling period.
The following code constructs an Observable that emits the numbers between one and a million, and then samples that Observable every ten milliseconds to see what number it is emitting at that moment.
def numbers = Observable.range( 1, 1000000 );
numbers.sample(10, java.util.concurrent.TimeUnit.MILLISECONDS).subscribe(
{ println(it); }, // onNext
{ println("Error: " + it.getMessage()); }, // onError
{ println("Sequence complete"); } // onCompleted
);
339707
547810
891282
Sequence complete
There is also a version of sample
that samples the source Observable not at a regular, periodic interval, but upon each emission of an item (or notification) from a second Observable, called the sampler. The following marble diagram illustrates this use of sample
:
- Backpressure
- javadoc:
sample(period,unit)
andsample(period,unit,scheduler)
- javadoc:
sample(sampler)
- javadoc:
throttleLast(period,unit)
andthrottleLast(period,unit,scheduler)
- RxMarbles interactive marble diagram
- Linq:
Sample
- RxJS:
sample
- Introduction to Rx: Sample
Use the throttleFirst( )
method to periodically look at an Observable to see what item it emitted first during a particular time span. The following code shows how an Observable can be modified by throttleFirst( )
:
Scheduler s = new TestScheduler();
PublishSubject<Integer> o = PublishSubject.create();
o.throttleFirst(500, TimeUnit.MILLISECONDS, s).subscribe(
{ println(it); }, // onNext
{ println("Error: " + it.getMessage()); }, // onError
{ println("Sequence complete"); } // onCompleted
);
// send events with simulated time increments
s.advanceTimeTo(0, TimeUnit.MILLISECONDS);
o.onNext(1); // deliver
o.onNext(2); // skip
s.advanceTimeTo(501, TimeUnit.MILLISECONDS);
o.onNext(3); // deliver
s.advanceTimeTo(600, TimeUnit.MILLISECONDS);
o.onNext(4); // skip
s.advanceTimeTo(700, TimeUnit.MILLISECONDS);
o.onNext(5); // skip
o.onNext(6); // skip
s.advanceTimeTo(1001, TimeUnit.MILLISECONDS);
o.onNext(7); // deliver
s.advanceTimeTo(1501, TimeUnit.MILLISECONDS);
o.onCompleted();
1
3
7
Sequence complete
- Backpressure
- javadoc:
throttleFirst(windowDuration,unit)
- javadoc:
throttleFirst(windowDuration,unit,scheduler)
only emit an item from the source Observable after a particular timespan has passed without the Observable emitting any other items
Use the throttleWithTimeout( )
method to select only those items emitted by a source Observable that are not quickly superceded by other items.
Note that the last item emitted by the source Observable will be emitted in turn by throttleWithTimeout( )
even if the source Observable's onCompleted
notification is issued within the time window you specify since that item's emission. That is to say: an onCompleted
notification will not trigger a throttle.
- Backpressure
- javadoc:
throttleWithTimeout(timeout,unit)
ordebounce(timeout,unit)
- javadoc:
throttleWithTimeout(timeout,unit,scheduler)
ordebounce(timeout,unit,scheduler)
- javadoc:
debounce(debounceSelector)
- RxMarbles interactive marble diagram
- Linq:
Throttle
- RxJS:
throttle
- Introduction to Rx: Throttle
emit items from a source Observable, but issue an exception if no item is emitted in a specified timespan
The timeout( )
operator emits the items emitted by a source Observable unless the source Observable fails to emit an item within a specified period of time since the previous item it emitted, in which case timeout( )
will call onError( )
with a TimeoutException
.
Another version of timeout( )
does not call onError( )
but instead switches to emitting items from an alternative Observable if the original Observable fails to emit an item within the specified timeout period:
- javadoc:
timeout(time,unit)
andtimeout(time,unit,scheduler)
- javadoc:
timeout(time,unit,fallback)
andtimeout(time,unit,fallback,scheduler)
- Linq:
Timeout
- RxJS:
timeout
andtimeoutWithSelector
- Introduction to Rx: Timeout
Use the distinct( )
method to remove duplicate items from a source Observable and only emit single examples of those items.
You can also pass a function or a comparator into distinct( )
that customizes how it distinguishes between distinct and non-distinct items.
- javadoc:
distinct()
- javadoc:
distinct(keySelector)
- RxMarbles interactive marble diagram
- Linq:
Distinct
- RxJS:
distinct
- Introduction to Rx: Distinct and DistinctUntilChanged
Use the distinctUntilChanged( )
method to remove duplicate consecutive items from a source Observable and only emit single examples of such items.
You can also pass a function or a comparator into distinctUntilChanged( )
that customizes how it distinguishes between distinct and non-distinct items.
- javadoc:
distinctUntilChanged()
- javadoc:
distinctUntilChanged(keySelector)
- RxMarbles interactive marble diagram
- Linq:
DistinctUntilChanged
- RxJS:
distinctUntilChanged
- Introduction to Rx: Distinct and DistinctUntilChanged
If you only want to be notified of the emission of items from an Observable when those items are of a particular class, you can apply the ofType
operator to the Observable before subscribing to it.
- javadoc:
ofType(class)
- Linq:
OfType
- Introduction to Rx: Cast and OfType
discard the items emitted by the source Observable and only pass through the error or completed notification
If you do not care about the items being emitted by an Observable, but do want to be notified when it completes or when it terminates with an error, you can apply the ignoreElements
operator to the Observable, which will ensure that it will never call its observers' onNext
handlers.
- javadoc:
ignoreElements()
- Linq:
IgnoreElements
- RxJS:
ignoreElements
- Introduction to Rx: IgnoreElements
Copyright (c) 2016-present, RxJava Contributors.
Twitter @RxJava | Gitter @RxJava