Skip to content

Filtering Observables

David Gross edited this page Aug 19, 2014 · 79 revisions

This section explains operators you can use to filter and select items emitted by Observables.

  • filter( ) — filter items emitted by an Observable
  • takeLast( ) — only emit the last n items emitted by an Observable
  • last( ) — emit only the last item emitted by an Observable
  • lastOrDefault( ) — emit only the last item emitted by an Observable, or a default value if the source Observable is empty
  • takeLastBuffer( ) — emit the last n items emitted by an Observable, as a single list item
  • skip( ) — ignore the first n items emitted by an Observable
  • skipLast( ) — ignore the last n items emitted by an Observable
  • take( ) — emit only the first n items emitted by an Observable
  • first( ) and takeFirst( ) — emit only the first item emitted by an Observable, or the first item that meets some condition
  • firstOrDefault( ) — emit only the first item emitted by an Observable, or the first item that meets some condition, or a default value if the source Observable is empty
  • elementAt( ) — emit item n emitted by the source Observable
  • elementAtOrDefault( ) — emit item n emitted by the source Observable, or a default item if the source Observable emits fewer than n items
  • sample( ) or throttleLast( ) — emit the most recent items emitted by an Observable within periodic time intervals
  • throttleFirst( ) — emit the first items emitted by an Observable within periodic time intervals
  • throttleWithTimeout( ) or debounce( ) — only emit an item from the source Observable after a particular timespan has passed without the Observable emitting any other items
  • timeout( ) — emit items from a source Observable, but issue an exception if no item is emitted in a specified timespan
  • distinct( ) — suppress duplicate items emitted by the source Observable
  • distinctUntilChanged( ) — suppress duplicate consecutive items emitted by the source Observable
  • ofType( ) — emit only those items from the source Observable that are of a particular class
  • ignoreElements( ) — discard the items emitted by the source Observable and only pass through the error or completed notification

filter( )

filter items emitted by an Observable

You can filter an Observable, discarding any items that do not meet some test, by passing a filtering function into the filter( ) method. For example, the following code filters a list of integers, emitting only those that are even (that is, where the remainder from dividing the number by two is zero):

numbers = Observable.from([1, 2, 3, 4, 5, 6, 7, 8, 9]);

numbers.filter({ 0 == (it % 2) }).subscribe(
  { println(it); },                          // onNext
  { println("Error: " + it.getMessage()); }, // onError
  { println("Sequence complete"); }          // onCompleted
);
2
4
6
8
Sequence complete

see also:


takeLast( )

only emit the last n items emitted by an Observable

To convert an Observable that emits several items into one that only emits the last n of these items before completing, use the takeLast( ) method. For instance, in the following code, takeLast( ) emits only the last integer in the list of integers represented by numbers:

numbers = Observable.from([1, 2, 3, 4, 5, 6, 7, 8, 9]);

numbers.takeLast(1).subscribe(
  { println(it); },                          // onNext
  { println("Error: " + it.getMessage()); }, // onError
  { println("Sequence complete"); }          // onCompleted
);
9
Sequence complete

There are also versions of takeLast( ) that emit the items that were emitted by the source Observable during a specified window of time before the Observable completed, or that emit a maximum of n items from such a window.

see also:


last( )

only emit the last item emitted by an Observable

The last( ) operator is equivalent to takeLast(1) except that it will throw an NoSuchElementException if the source Observable does not emit at least one item. Note that there is also a BlockingObservable implementation of last( ).

Note: in the scala language adaptor for RxJava, this method is called takeRight( ).

see also:


lastOrDefault( )

emit only the last item emitted by an Observable, or a default value if the source Observable is empty

The lastOrDefault( ) operator returns an Observable that emits the last item emitted by the source Observable, or a default item if the source Observable does not emit at least one item. Note that there is also a BlockingObservable implementation of lastOrDefault( ).

see also:


takeLastBuffer( )

emit the last n items emitted by an Observable, as a single list item

To convert an Observable that emits several items into one that emits the last n of these items as a single list before completing, use the takeLastBuffer( ) method. There are also versions of takeLastBuffer( ) that emit a list containing the items that were emitted by the source Observable during a specified window of time before the Observable completed, or a maximum of n items from such a window.

def numbers = Observable.from([1, 2, 3, 4, 5, 6, 7, 8]);

numbers.takeLastBuffer(3).subscribe(
  { println(it); },                          // onNext
  { println("Error: " + it.getMessage()); }, // onError
  { println("Sequence complete"); }          // onCompleted
);
[6, 7, 8]
Sequence complete

see also:


skip()

ignore the first n items emitted by an Observable

You can ignore the first n items emitted by an Observable and attend only to those items that come after, by modifying the Observable with the skip(n) method.

numbers = Observable.from([1, 2, 3, 4, 5, 6, 7, 8, 9]);

numbers.skip(3).subscribe(
  { println(it); },                          // onNext
  { println("Error: " + it.getMessage()); }, // onError
  { println("Sequence complete"); }          // onCompleted
);
4
5
6
7
8
9
Sequence complete

There are also versions of skip() that ignore the items emitted by an Observable during a specified period of time after the Observable is subscribed to.

Note: in the scala language adaptor for RxJava, this method is called drop( ).

see also:


skipLast()

ignore the last n items emitted by an Observable

You can ignore the last n items emitted by an Observable and attend only to those items that preced them, by modifying the Observable with the skipLast(n) method. Note that the mechanism by which this is implemented will delay the emission of any item from the source Observable until n additional items have been emitted by that Observable.

There are also versions of skipLast() that ignore the items emitted by an Observable during a specified period of time before the Observable completes.

Note: in the scala language adaptor for RxJava, this method is called dropRight( ).

see also:


take( )

emit only the first n items emitted by an Observable

You can choose to pay attention only to the first n items emitted by an Observable by calling its take(n) method. That method returns an Observable that will invoke an Subscriber’s onNext method a maximum of n times before invoking onCompleted. For example,

numbers = Observable.from([1, 2, 3, 4, 5, 6, 7, 8, 9]);

numbers.take(3).subscribe(
  { println(it); },                          // onNext
  { println("Error: " + it.getMessage()); }, // onError
  { println("Sequence complete"); }          // onCompleted
);
1
2
3
Sequence complete

If you call take(n) on an Observable, and that Observable emits fewer than n items before completing, the new, take-modified Observable will not throw an exception or invoke onError(), but will merely emit this same fewer number of items before it completes.

see also:


first( ) and takeFirst( )

emit only the first item emitted by an Observable, or the first item that meets some condition

To create an Observable that emits only the first item emitted by a source Observable (if any), use the first( ) method.

You can also pass a function to this method that evaluates items as they are emitted by the source Observable, in which case first( ) will create an Observable that emits the first such item for which your function returns true (if any).

takeFirst( ) behaves very similarly to first( ) with the exception of how they behave when the source Observable emits no items (or no items that match the predicate). In such a case, first( ) will throw an NoSuchElementException while takeFirst( ) will return an empty Observable (one that calls onCompleted( ) but never calls onNext( )).

see also:


firstOrDefault( )

emit only the first item emitted by an Observable, or the first item that meets some condition, or a default value if the source Observable is empty

To create an Observable that emits only the first item emitted by a source Observable (or a default value if the source Observable is empty), use the firstOrDefault( ) method.

You can also pass a function to this method that evaluates items as they are emitted by the source Observable, in which case firstOrDefault( ) will create an Observable that emits the first such item for which your function returns true (or the supplied default value if no such item is emitted).

Note: in the scala language adaptor for RxJava, this method is called firstOrElse( ) or headOrElse( ).

see also:


elementAt( )

emit item n emitted by the source Observable

Pass elementAt( ) a zero-based index value and it will emit the solitary item from the source Observable's sequence that matches that index value (for example, if you pass the index value 5, elementAt( ) will emit the sixth item emitted by the source Observable). If you pass in a negative index value, or if the source Observable emits fewer than index value + 1 items, elementAt( ) will throw an IndexOutOfBoundsException.

see also:


elementAtOrDefault( )

emit item n emitted by the source Observable, or a default item if the source Observable emits fewer than n items

Pass elementAtOrDefault( ) a zero-based index value and it will emit the solitary item from the source Observable's sequence that matches that index value (for example, if you pass the index value 5, elementAtOrDefault( ) will emit the sixth item emitted by the source Observable). If you pass in a negative index value, elementAtOrDefault( ) will throw an IndexOutOfBoundsException. If the source Observable emits fewer than index value + 1 items, elementAtOrDefault( ) will emit the default value you pass in (you must also pass in a type for this value that is appropriate to what type your Subscribers expect to observe).

see also:


sample( ) or throttleLast( )

emit the most recent items emitted by an Observable within periodic time intervals

Use the sample( ) method to periodically look at an Observable to see what item it has most recently emitted since the previous sampling. Note that if the source Observable has emitted no items since the last time it was sampled, the Observable that results from the sample( ) operator will emit no item for that sampling period.

The following code constructs an Observable that emits the numbers between one and a million, and then samples that Observable every ten milliseconds to see what number it is emitting at that moment.

def numbers = Observable.range( 1, 1000000 );
 
numbers.sample(10, java.util.concurrent.TimeUnit.MILLISECONDS).subscribe(
  { println(it); },                          // onNext
  { println("Error: " + it.getMessage()); }, // onError
  { println("Sequence complete"); }          // onCompleted
);
339707
547810
891282
Sequence complete

There is also a version of sample that samples the source Observable not at a regular, periodic interval, but upon each emission of an item (or notification) from a second Observable, called the sampler. The following marble diagram illustrates this use of sample:

see also:


throttleFirst( )

emit the first items emitted by an Observable within periodic time intervals

Use the throttleFirst( ) method to periodically look at an Observable to see what item it emitted first during a particular time span. The following code shows how an Observable can be modified by throttleFirst( ):

Scheduler s = new TestScheduler();
PublishSubject<Integer> o = PublishSubject.create();
o.throttleFirst(500, TimeUnit.MILLISECONDS, s).subscribe(
  { println(it); },                          // onNext
  { println("Error: " + it.getMessage()); }, // onError
  { println("Sequence complete"); }          // onCompleted
);
// send events with simulated time increments
s.advanceTimeTo(0, TimeUnit.MILLISECONDS);
o.onNext(1); // deliver
o.onNext(2); // skip
s.advanceTimeTo(501, TimeUnit.MILLISECONDS);
o.onNext(3); // deliver
s.advanceTimeTo(600, TimeUnit.MILLISECONDS);
o.onNext(4); // skip
s.advanceTimeTo(700, TimeUnit.MILLISECONDS);
o.onNext(5); // skip
o.onNext(6); // skip
s.advanceTimeTo(1001, TimeUnit.MILLISECONDS);
o.onNext(7); // deliver
s.advanceTimeTo(1501, TimeUnit.MILLISECONDS);
o.onCompleted();
1
3
7
Sequence complete

see also:


throttleWithTimeout( ) or debounce( )

only emit an item from the source Observable after a particular timespan has passed without the Observable emitting any other items

Use the throttleWithTimeout( ) method to select only those items emitted by a source Observable that are not quickly superceded by other items.

Note that the last item emitted by the source Observable will be emitted in turn by throttleWithTimeout( ) even if the source Observable's onCompleted notification is issued within the time window you specify since that item's emission. That is to say: an onCompleted notification will not trigger a throttle.

see also:


timeout( )

emit items from a source Observable, but issue an exception if no item is emitted in a specified timespan

The timeout( ) operator emits the items emitted by a source Observable unless the source Observable fails to emit an item within a specified period of time since the previous item it emitted, in which case timeout( ) will call onError( ) with a TimeoutException.

Another version of timeout( ) does not call onError( ) but instead switches to emitting items from an alternative Observable if the original Observable fails to emit an item within the specified timeout period:

see also:


distinct( )

suppress duplicate items emitted by the source Observable

Use the distinct( ) method to remove duplicate items from a source Observable and only emit single examples of those items.

You can also pass a function or a comparator into distinct( ) that customizes how it distinguishes between distinct and non-distinct items.

see also:


distinctUntilChanged( )

suppress duplicate consecutive items emitted by the source Observable

Use the distinctUntilChanged( ) method to remove duplicate consecutive items from a source Observable and only emit single examples of such items.

You can also pass a function or a comparator into distinctUntilChanged( ) that customizes how it distinguishes between distinct and non-distinct items.

see also:


ofType( )

emit only those items from the source Observable that are of a particular class

see also:


ignoreElements( )

discard the items emitted by the source Observable and only pass through the error or completed notification

see also:

Clone this wiki locally