Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

2.x: better documentation on the abstract consumer classes #5210

Merged
merged 1 commit into from
Mar 21, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 39 additions & 1 deletion src/main/java/io/reactivex/observers/DefaultObserver.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,48 @@
import io.reactivex.internal.disposables.DisposableHelper;

/**
* Abstract base implementation of an Observer with support for cancelling a
* Abstract base implementation of an {@link io.reactivex.Observer Observer} with support for cancelling a
* subscription via {@link #cancel()} (synchronously) and calls {@link #onStart()}
* when the subscription happens.
*
* <p>All pre-implemented final methods are thread-safe.
*
* <p>Use the protected {@link #cancel()} to dispose the sequence from within an
* {@code onNext} implementation.
*
* <p>Like all other consumers, {@code DefaultObserver} can be subscribed only once.
* Any subsequent attempt to subscribe it to a new source will yield an
* {@link IllegalStateException} with message {@code "Disposable already set!"}.
*
* <p>Implementation of {@link #onStart()}, {@link #onNext(Object)}, {@link #onError(Throwable)}
* and {@link #onComplete()} are not allowed to throw any unchecked exceptions.
* If for some reason this can't be avoided, use {@link io.reactivex.Observable#safeSubscribe(io.reactivex.Observer)}
* instead of the standard {@code subscribe()} method.
*
* <p>Example<code><pre>
* Disposable d =
* Observable.range(1, 5)
* .subscribeWith(new DefaultObserver&lt;Integer>() {
* &#64;Override public void onStart() {
* System.out.println("Start!");
* }
* &#64;Override public void onNext(Integer t) {
* if (t == 3) {
* cancel();
* }
* System.out.println(t);
* }
* &#64;Override public void onError(Throwable t) {
* t.printStackTrace();
* }
* &#64;Override public void onComplete() {
* System.out.println("Done!");
* }
* });
* // ...
* d.dispose();
* </pre></code>
*
* @param <T> the value type
*/
public abstract class DefaultObserver<T> implements Observer<T> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,33 @@

/**
* An abstract {@link CompletableObserver} that allows asynchronous cancellation by implementing Disposable.
*
* <p>All pre-implemented final methods are thread-safe.
*
* <p>Like all other consumers, {@code DisposableCompletableObserver} can be subscribed only once.
* Any subsequent attempt to subscribe it to a new source will yield an
* {@link IllegalStateException} with message {@code "Disposable already set!"}.
*
* <p>Implementation of {@link #onStart()}, {@link #onError(Throwable)} and
* {@link #onComplete()} are not allowed to throw any unchecked exceptions.
*
* <p>Example<code><pre>
* Disposable d =
* Completable.complete().delay(1, TimeUnit.SECONDS)
* .subscribeWith(new DisposableMaybeObserver&lt;Integer>() {
* &#64;Override public void onStart() {
* System.out.println("Start!");
* }
* &#64;Override public void onError(Throwable t) {
* t.printStackTrace();
* }
* &#64;Override public void onComplete() {
* System.out.println("Done!");
* }
* });
* // ...
* d.dispose();
* </pre></code>
*/
public abstract class DisposableCompletableObserver implements CompletableObserver, Disposable {
final AtomicReference<Disposable> s = new AtomicReference<Disposable>();
Expand Down
35 changes: 35 additions & 0 deletions src/main/java/io/reactivex/observers/DisposableMaybeObserver.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,41 @@
/**
* An abstract {@link MaybeObserver} that allows asynchronous cancellation by implementing Disposable.
*
* <p>All pre-implemented final methods are thread-safe.
*
* <p>Note that {@link #onSuccess(Object)}, {@link #onError(Throwable)} and {@link #onComplete()} are
* exclusive to each other, unlike a regular {@link io.reactivex.Observer Observer}, and
* {@code onComplete()} is never called after an {@code onSuccess()}.
*
* <p>Like all other consumers, {@code DisposableMaybeObserver} can be subscribed only once.
* Any subsequent attempt to subscribe it to a new source will yield an
* {@link IllegalStateException} with message {@code "Disposable already set!"}.
*
* <p>Implementation of {@link #onStart()}, {@link #onSuccess(Object)}, {@link #onError(Throwable)} and
* {@link #onComplete()} are not allowed to throw any unchecked exceptions.
*
* <p>Example<code><pre>
* Disposable d =
* Maybe.just(1).delay(1, TimeUnit.SECONDS)
* .subscribeWith(new DisposableMaybeObserver&lt;Integer>() {
* &#64;Override public void onStart() {
* System.out.println("Start!");
* }
* &#64;Override public void onSuccess(Integer t) {
* System.out.println(t);
* }
* &#64;Override public void onError(Throwable t) {
* t.printStackTrace();
* }
* &#64;Override public void onComplete() {
* System.out.println("Done!");
* }
* });
* // ...
* d.dispose();
* </pre></code>
*
*
* @param <T> the received value type
*/
public abstract class DisposableMaybeObserver<T> implements MaybeObserver<T>, Disposable {
Expand Down
38 changes: 38 additions & 0 deletions src/main/java/io/reactivex/observers/DisposableObserver.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,44 @@
/**
* An abstract {@link Observer} that allows asynchronous cancellation by implementing Disposable.
*
* <p>All pre-implemented final methods are thread-safe.
*
* <p>Use the protected {@link #dispose()} to dispose the sequence from within an
* {@code onNext} implementation.
*
* <p>Like all other consumers, {@code DefaultObserver} can be subscribed only once.
* Any subsequent attempt to subscribe it to a new source will yield an
* {@link IllegalStateException} with message {@code "Disposable already set!"}.
*
* <p>Implementation of {@link #onStart()}, {@link #onNext(Object)}, {@link #onError(Throwable)}
* and {@link #onComplete()} are not allowed to throw any unchecked exceptions.
* If for some reason this can't be avoided, use {@link io.reactivex.Observable#safeSubscribe(io.reactivex.Observer)}
* instead of the standard {@code subscribe()} method.
*
* <p>Example<code><pre>
* Disposable d =
* Observable.range(1, 5)
* .subscribeWith(new DisposableObserver&lt;Integer>() {
* &#64;Override public void onStart() {
* System.out.println("Start!");
* }
* &#64;Override public void onNext(Integer t) {
* if (t == 3) {
* dispose();
* }
* System.out.println(t);
* }
* &#64;Override public void onError(Throwable t) {
* t.printStackTrace();
* }
* &#64;Override public void onComplete() {
* System.out.println("Done!");
* }
* });
* // ...
* d.dispose();
* </pre></code>
*
* @param <T> the received value type
*/
public abstract class DisposableObserver<T> implements Observer<T>, Disposable {
Expand Down
27 changes: 27 additions & 0 deletions src/main/java/io/reactivex/observers/DisposableSingleObserver.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,33 @@
/**
* An abstract {@link SingleObserver} that allows asynchronous cancellation by implementing Disposable.
*
* <p>All pre-implemented final methods are thread-safe.
*
* <p>Like all other consumers, {@code DisposableSingleObserver} can be subscribed only once.
* Any subsequent attempt to subscribe it to a new source will yield an
* {@link IllegalStateException} with message {@code "Disposable already set!"}.
*
* <p>Implementation of {@link #onStart()}, {@link #onSuccess(Object)} and {@link #onError(Throwable)}
* are not allowed to throw any unchecked exceptions.
*
* <p>Example<code><pre>
* Disposable d =
* Single.just(1).delay(1, TimeUnit.SECONDS)
* .subscribeWith(new DisposableSingleObserver&lt;Integer>() {
* &#64;Override public void onStart() {
* System.out.println("Start!");
* }
* &#64;Override public void onSuccess(Integer t) {
* System.out.println(t);
* }
* &#64;Override public void onError(Throwable t) {
* t.printStackTrace();
* }
* });
* // ...
* d.dispose();
* </pre></code>
*
* @param <T> the received value type
*/
public abstract class DisposableSingleObserver<T> implements SingleObserver<T>, Disposable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,51 @@
* An abstract {@link CompletableObserver} that allows asynchronous cancellation of its subscription and associated resources.
*
* <p>All pre-implemented final methods are thread-safe.
*
* <p>Override the protected {@link #onStart()} to perform initialization when this
* {@code ResourceCompletableObserver} is subscribed to a source.
*
* <p>Use the protected {@link #dispose()} to dispose the sequence externally and release
* all resources.
*
* <p>To release the associated resources, one has to call {@link #dispose()}
* in {@code onError()} and {@code onComplete()} explicitly.
*
* <p>Use {@link #add(Disposable)} to associate resources (as {@link io.reactivex.disposables.Disposable Disposable}s)
* with this {@code ResourceCompletableObserver} that will be cleaned up when {@link #dispose()} is called.
* Removing previously associated resources is not possible but one can create a
* {@link io.reactivex.disposables.CompositeDisposable CompositeDisposable}, associate it with this
* {@code ResourceCompletableObserver} and then add/remove resources to/from the {@code CompositeDisposable}
* freely.
*
* <p>Like all other consumers, {@code ResourceCompletableObserver} can be subscribed only once.
* Any subsequent attempt to subscribe it to a new source will yield an
* {@link IllegalStateException} with message {@code "Disposable already set!"}.
*
* <p>Implementation of {@link #onStart()}, {@link #onError(Throwable)}
* and {@link #onComplete()} are not allowed to throw any unchecked exceptions.
*
* <p>Example<code><pre>
* Disposable d =
* Completable.complete().delay(1, TimeUnit.SECONDS)
* .subscribeWith(new ResourceCompletableObserver() {
* &#64;Override public void onStart() {
* add(Schedulers.single()
* .scheduleDirect(() -> System.out.println("Time!"),
* 2, TimeUnit.SECONDS));
* }
* &#64;Override public void onError(Throwable t) {
* t.printStackTrace();
* dispose();
* }
* &#64;Override public void onComplete() {
* System.out.println("Done!");
* dispose();
* }
* });
* // ...
* d.dispose();
* </pre></code>
*/
public abstract class ResourceCompletableObserver implements CompletableObserver, Disposable {
/** The active subscription. */
Expand Down
53 changes: 53 additions & 0 deletions src/main/java/io/reactivex/observers/ResourceMaybeObserver.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,59 @@
*
* <p>All pre-implemented final methods are thread-safe.
*
* <p>Note that {@link #onSuccess(Object)}, {@link #onError(Throwable)} and {@link #onComplete()} are
* exclusive to each other, unlike a regular {@link io.reactivex.Observer Observer}, and
* {@code onComplete()} is never called after an {@code onSuccess()}.
*
* <p>Override the protected {@link #onStart()} to perform initialization when this
* {@code ResourceMaybeObserver} is subscribed to a source.
*
* <p>Use the protected {@link #dispose()} to dispose the sequence externally and release
* all resources.
*
* <p>To release the associated resources, one has to call {@link #dispose()}
* in {@code onSuccess()}, {@code onError()} and {@code onComplete()} explicitly.
*
* <p>Use {@link #add(Disposable)} to associate resources (as {@link io.reactivex.disposables.Disposable Disposable}s)
* with this {@code ResourceMaybeObserver} that will be cleaned up when {@link #dispose()} is called.
* Removing previously associated resources is not possible but one can create a
* {@link io.reactivex.disposables.CompositeDisposable CompositeDisposable}, associate it with this
* {@code ResourceMaybeObserver} and then add/remove resources to/from the {@code CompositeDisposable}
* freely.
*
* <p>Like all other consumers, {@code ResourceMaybeObserver} can be subscribed only once.
* Any subsequent attempt to subscribe it to a new source will yield an
* {@link IllegalStateException} with message {@code "Disposable already set!"}.
*
* <p>Implementation of {@link #onStart()}, {@link #onSuccess(Object)}, {@link #onError(Throwable)}
* and {@link #onComplete()} are not allowed to throw any unchecked exceptions.
*
* <p>Example<code><pre>
* Disposable d =
* Maybe.just(1).delay(1, TimeUnit.SECONDS)
* .subscribeWith(new ResourceMaybeObserver&lt;Integer>() {
* &#64;Override public void onStart() {
* add(Schedulers.single()
* .scheduleDirect(() -> System.out.println("Time!"),
* 2, TimeUnit.SECONDS));
* }
* &#64;Override public void onSuccess(Integer t) {
* System.out.println(t);
* dispose();
* }
* &#64;Override public void onError(Throwable t) {
* t.printStackTrace();
* dispose();
* }
* &#64;Override public void onComplete() {
* System.out.println("Done!");
* dispose();
* }
* });
* // ...
* d.dispose();
* </pre></code>
*
* @param <T> the value type
*/
public abstract class ResourceMaybeObserver<T> implements MaybeObserver<T>, Disposable {
Expand Down
51 changes: 51 additions & 0 deletions src/main/java/io/reactivex/observers/ResourceObserver.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,57 @@
*
* <p>All pre-implemented final methods are thread-safe.
*
* <p>To release the associated resources, one has to call {@link #dispose()}
* in {@code onError()} and {@code onComplete()} explicitly.
*
* <p>Use {@link #add(Disposable)} to associate resources (as {@link io.reactivex.disposables.Disposable Disposable}s)
* with this {@code ResourceObserver} that will be cleaned up when {@link #dispose()} is called.
* Removing previously associated resources is not possible but one can create a
* {@link io.reactivex.disposables.CompositeDisposable CompositeDisposable}, associate it with this
* {@code ResourceObserver} and then add/remove resources to/from the {@code CompositeDisposable}
* freely.
*
* <p>Use the {@link #dispose()} to dispose the sequence from within an
* {@code onNext} implementation.
*
* <p>Like all other consumers, {@code ResourceObserver} can be subscribed only once.
* Any subsequent attempt to subscribe it to a new source will yield an
* {@link IllegalStateException} with message {@code "Disposable already set!"}.
*
* <p>Implementation of {@link #onStart()}, {@link #onNext(Object)}, {@link #onError(Throwable)}
* and {@link #onComplete()} are not allowed to throw any unchecked exceptions.
* If for some reason this can't be avoided, use {@link io.reactivex.Observable#safeSubscribe(io.reactivex.Observer)}
* instead of the standard {@code subscribe()} method.
*
* <p>Example<code><pre>
* Disposable d =
* Observable.range(1, 5)
* .subscribeWith(new ResourceObserver&lt;Integer>() {
* &#64;Override public void onStart() {
* add(Schedulers.single()
* .scheduleDirect(() -> System.out.println("Time!"),
* 2, TimeUnit.SECONDS));
* request(1);
* }
* &#64;Override public void onNext(Integer t) {
* if (t == 3) {
* dispose();
* }
* System.out.println(t);
* }
* &#64;Override public void onError(Throwable t) {
* t.printStackTrace();
* dispose();
* }
* &#64;Override public void onComplete() {
* System.out.println("Done!");
* dispose();
* }
* });
* // ...
* d.dispose();
* </pre></code>
*
* @param <T> the value type
*/
public abstract class ResourceObserver<T> implements Observer<T>, Disposable {
Expand Down
Loading