Skip to content

Commit

Permalink
Emit OperationStatus corresponding to Void response (Azure#349)
Browse files Browse the repository at this point in the history
  • Loading branch information
RikkiGibson authored Jan 19, 2018
1 parent 22dd100 commit 10934ce
Showing 1 changed file with 14 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import io.reactivex.Observable;
import io.reactivex.ObservableSource;
import io.reactivex.Single;
import io.reactivex.SingleSource;
import io.reactivex.exceptions.Exceptions;
import io.reactivex.functions.Function;

Expand All @@ -41,6 +42,7 @@
import java.lang.reflect.Type;
import java.net.NetworkInterface;
import java.util.Enumeration;
import java.util.concurrent.Callable;

/**
* This class can be used to create an Azure specific proxy implementation for a provided Swagger
Expand Down Expand Up @@ -256,12 +258,18 @@ public ObservableSource<?> apply(HttpResponse httpResponse) throws Exception {
return createPollStrategy(httpRequest, Single.just(bufferedHttpResponse), methodParser).flatMapObservable(new Function<PollStrategy, ObservableSource<OperationStatus<?>>>() {
@Override
public ObservableSource<OperationStatus<?>> apply(final PollStrategy pollStrategy) throws Exception {
Observable<OperationStatus<?>> first = handleBodyReturnTypeAsync(bufferedHttpResponse, methodParser, operationStatusResultType).flatMapObservable(new Function<Object, ObservableSource<OperationStatus<?>>>() {
@Override
public ObservableSource<OperationStatus<?>> apply(Object operationResult) throws Exception {
return Observable.<OperationStatus<?>>just(new OperationStatus<>(operationResult, pollStrategy.status()));
}
});
Observable<OperationStatus<?>> first = handleBodyReturnTypeAsync(bufferedHttpResponse, methodParser, operationStatusResultType)
.map(new Function<Object, OperationStatus<?>>() {
@Override
public OperationStatus<?> apply(Object operationResult) throws Exception {
return new OperationStatus<>(operationResult, pollStrategy.status());
}
}).switchIfEmpty(Single.defer(new Callable<SingleSource<? extends OperationStatus<?>>>() {
@Override
public SingleSource<? extends OperationStatus<?>> call() throws Exception {
return Single.just(new OperationStatus<>((Object) null, pollStrategy.status()));
}
})).toObservable();
Observable<OperationStatus<Object>> rest = pollStrategy.pollUntilDoneWithStatusUpdates(httpRequest, methodParser, operationStatusResultType);
return first.concatWith(rest);
}
Expand Down

0 comments on commit 10934ce

Please sign in to comment.