Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add toObservable() to the common HystrixExecutable interface #312

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,7 @@ protected Collection<Collection<CollapsedRequest<ResponseType, RequestArgumentTy
* @return {@code Observable<R>} that executes and calls back with the result of of {@link HystrixCommand}{@code <BatchReturnType>} execution after passing through {@link #mapResponseToRequests}
* to transform the {@code <BatchReturnType>} into {@code <ResponseType>}
*/
@Override
public Observable<ResponseType> observe() {
// us a ReplaySubject to buffer the eagerly subscribed-to Observable
ReplaySubject<ResponseType> subject = ReplaySubject.create();
Expand All @@ -332,6 +333,7 @@ public Observable<ResponseType> observe() {
* @return {@code Observable<R>} that lazily executes and calls back with the result of of {@link HystrixCommand}{@code <BatchReturnType>} execution after passing through
* {@link #mapResponseToRequests} to transform the {@code <BatchReturnType>} into {@code <ResponseType>}
*/
@Override
public Observable<ResponseType> toObservable() {
// when we callback with the data we want to do the work
// on a separate thread than the one giving us the callback
Expand All @@ -348,6 +350,7 @@ public Observable<ResponseType> toObservable() {
* @return {@code Observable<R>} that lazily executes and calls back with the result of of {@link HystrixCommand}{@code <BatchReturnType>} execution after passing through
* {@link #mapResponseToRequests} to transform the {@code <BatchReturnType>} into {@code <ResponseType>}
*/
@Override
public Observable<ResponseType> toObservable(Scheduler observeOn) {

/* try from cache first */
Expand Down Expand Up @@ -397,6 +400,7 @@ public Observable<ResponseType> toObservable(Scheduler observeOn) {
* @throws HystrixRuntimeException
* if an error occurs and a fallback cannot be retrieved
*/
@Override
public ResponseType execute() {
try {
return queue().get();
Expand Down Expand Up @@ -427,6 +431,7 @@ public ResponseType execute() {
* @throws HystrixRuntimeException
* within an <code>ExecutionException.getCause()</code> (thrown by {@link Future#get}) if an error occurs and a fallback cannot be retrieved
*/
@Override
public Future<ResponseType> queue() {
final Observable<ResponseType> o = toObservable();
return o.toBlockingObservable().toFuture();
Expand Down
22 changes: 2 additions & 20 deletions hystrix-core/src/main/java/com/netflix/hystrix/HystrixCommand.java
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,7 @@ protected String getCacheKey() {
* @throws IllegalStateException
* if invoked more than once
*/
@Override
public Observable<R> toObservable() {
if (observableCommand.properties.executionIsolationStrategy().get().equals(ExecutionIsolationStrategy.THREAD)) {
return toObservable(Schedulers.computation());
Expand All @@ -301,26 +302,7 @@ public Observable<R> toObservable() {
}
}

/**
* A lazy {@link Observable} that will execute the command when subscribed to.
* <p>
* See https://github.com/Netflix/RxJava/wiki for more information.
*
* @param observeOn
* The {@link Scheduler} to execute callbacks on.
* @return {@code Observable<R>} that lazily executes and calls back with the result of {@link #run()} execution or a fallback from {@link #getFallback()} if the command fails for any reason.
* @throws HystrixRuntimeException
* if a fallback does not exist
* <p>
* <ul>
* <li>via {@code Observer#onError} if a failure occurs</li>
* <li>or immediately if the command can not be queued (such as short-circuited, thread-pool/semaphore rejected)</li>
* </ul>
* @throws HystrixBadRequestException
* via {@code Observer#onError} if invalid arguments or state were used representing a user failure, not a system failure
* @throws IllegalStateException
* if invoked more than once
*/
@Override
public Observable<R> toObservable(Scheduler observeOn) {
return toObservable(observeOn, true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.netflix.hystrix.HystrixCommandProperties.ExecutionIsolationStrategy;
import com.netflix.hystrix.exception.HystrixBadRequestException;
import com.netflix.hystrix.exception.HystrixRuntimeException;
import rx.Scheduler;

/**
* Common interface for executables ({@link HystrixCommand} and {@link HystrixCollapser}) so client code can treat them the same and combine in typed collections if desired.
Expand Down Expand Up @@ -64,7 +65,7 @@ public interface HystrixExecutable<R> {
* Used for asynchronous execution of command with a callback by subscribing to the {@link Observable}.
* <p>
* This eagerly starts execution of the command the same as {@link #queue()} and {@link #execute()}.
* A lazy {@link Observable} can be obtained from {@link HystrixCommand#toObservable()} or {@link HystrixCollapser#toObservable()}.
* A lazy {@link Observable} can be obtained from {@link #toObservable()}.
* <p>
* <b>Callback Scheduling</b>
* <p>
Expand All @@ -91,4 +92,54 @@ public interface HystrixExecutable<R> {
*/
public Observable<R> observe();

/**
* A lazy {@link Observable} that will execute the command when subscribed to.
* <p>
* See https://github.com/Netflix/RxJava/wiki for more information.
*
* @param observeOn
* The {@link Scheduler} to execute callbacks on.
* @return {@code Observable<R>} that lazily executes and calls back with the result of {@link #run()} execution or a fallback from {@link #getFallback()} if the command fails for any reason.
* @throws HystrixRuntimeException
* if a fallback does not exist
* <p>
* <ul>
* <li>via {@code Observer#onError} if a failure occurs</li>
* <li>or immediately if the command can not be queued (such as short-circuited, thread-pool/semaphore rejected)</li>
* </ul>
* @throws HystrixBadRequestException
* via {@code Observer#onError} if invalid arguments or state were used representing a user failure, not a system failure
* @throws IllegalStateException
* if invoked more than once
*/
public Observable<R> toObservable(Scheduler observeOn);

/**
* A lazy {@link Observable} that will execute the command when subscribed to.
* <p>
* <b>Callback Scheduling</b>
* <p>
* The scheduler to use depends on the individual implementation
* <ul>
* <li>When using {@link ExecutionIsolationStrategy#THREAD} this defaults to using {@link Schedulers#threadPoolForComputation()} for callbacks.</li>
* <li>When using {@link ExecutionIsolationStrategy#SEMAPHORE} this defaults to using {@link Schedulers#immediate()} for callbacks.</li>
* </ul>
* <p>
* See https://github.com/Netflix/RxJava/wiki for more information.
*
* @return {@code Observable<R>} that lazily executes and calls back with the result of {@link #run()} execution or a fallback from {@link #getFallback()} if the command fails for any reason.
*
* @throws HystrixRuntimeException
* if a fallback does not exist
* <p>
* <ul>
* <li>via {@code Observer#onError} if a failure occurs</li>
* <li>or immediately if the command can not be queued (such as short-circuited, thread-pool/semaphore rejected)</li>
* </ul>
* @throws HystrixBadRequestException
* via {@code Observer#onError} if invalid arguments or state were used representing a user failure, not a system failure
* @throws IllegalStateException
* if invoked more than once
*/
public Observable<R> toObservable();
}
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,7 @@ protected HystrixExecutableBase(HystrixCommandGroupKey group, HystrixCommandKey
* @throws IllegalStateException
* if invoked more than once
*/
@Override
public R execute() {
try {
return queue().get();
Expand Down Expand Up @@ -321,6 +322,7 @@ public R execute() {
* @throws IllegalStateException
* if invoked more than once
*/
@Override
public Future<R> queue() {
/*
* --- Schedulers.immediate()
Expand Down Expand Up @@ -544,6 +546,7 @@ public R get(long timeout, TimeUnit unit) throws InterruptedException, Execution
* @throws IllegalStateException
* if invoked more than once
*/
@Override
public Observable<R> observe() {
// us a ReplaySubject to buffer the eagerly subscribed-to Observable
ReplaySubject<R> subject = ReplaySubject.create();
Expand Down Expand Up @@ -573,12 +576,11 @@ public Observable<R> observe() {
* @throws IllegalStateException
* if invoked more than once
*/
@Override
public Observable<R> toObservable(Scheduler observeOn) {
return toObservable(observeOn, true);
}

public abstract Observable<R> toObservable();

protected abstract ObservableCommand<R> toObservable(final Scheduler observeOn, boolean performAsyncTimeout);

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,7 @@ protected Collection<Collection<CollapsedRequest<ResponseType, RequestArgumentTy
* @return {@code Observable<R>} that executes and calls back with the result of of {@link HystrixCommand}{@code <BatchReturnType>} execution after passing through {@link #mapResponseToRequests}
* to transform the {@code <BatchReturnType>} into {@code <ResponseType>}
*/
@Override
public Observable<ResponseType> observe() {
// us a ReplaySubject to buffer the eagerly subscribed-to Observable
ReplaySubject<ResponseType> subject = ReplaySubject.create();
Expand All @@ -343,6 +344,7 @@ public Observable<ResponseType> observe() {
* @return {@code Observable<R>} that lazily executes and calls back with the result of of {@link HystrixCommand}{@code <BatchReturnType>} execution after passing through
* {@link #mapResponseToRequests} to transform the {@code <BatchReturnType>} into {@code <ResponseType>}
*/
@Override
public Observable<ResponseType> toObservable() {
// when we callback with the data we want to do the work
// on a separate thread than the one giving us the callback
Expand All @@ -359,6 +361,7 @@ public Observable<ResponseType> toObservable() {
* @return {@code Observable<R>} that lazily executes and calls back with the result of of {@link HystrixCommand}{@code <BatchReturnType>} execution after passing through
* {@link #mapResponseToRequests} to transform the {@code <BatchReturnType>} into {@code <ResponseType>}
*/
@Override
public Observable<ResponseType> toObservable(Scheduler observeOn) {

/* try from cache first */
Expand Down Expand Up @@ -408,6 +411,7 @@ public Observable<ResponseType> toObservable(Scheduler observeOn) {
* @throws HystrixRuntimeException
* if an error occurs and a fallback cannot be retrieved
*/
@Override
public ResponseType execute() {
try {
return queue().get();
Expand Down Expand Up @@ -438,6 +442,7 @@ public ResponseType execute() {
* @throws HystrixRuntimeException
* within an <code>ExecutionException.getCause()</code> (thrown by {@link Future#get}) if an error occurs and a fallback cannot be retrieved
*/
@Override
public Future<ResponseType> queue() {
final Observable<ResponseType> o = toObservable();
return o.toBlockingObservable().toFuture();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,10 +246,12 @@ protected Observable<R> getFallback() {
* @throws IllegalStateException
* if invoked more than once
*/
@Override
public Observable<R> toObservable() {
return toObservable(Schedulers.immediate());
}

@Override
protected ObservableCommand<R> toObservable(final Scheduler observeOn, final boolean performAsyncTimeout) {
/* this is a stateful object so can only be used once */
if (!started.compareAndSet(false, true)) {
Expand Down