Skip to content

Commit

Permalink
Merge pull request Azure#387 from alvadb/v2
Browse files Browse the repository at this point in the history
Initial check in for resumable LRO.
  • Loading branch information
alvadb authored Feb 23, 2018
2 parents 7dd4975 + be7995a commit 29b33eb
Show file tree
Hide file tree
Showing 11 changed files with 575 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,16 @@

package com.microsoft.azure.v2;

import com.microsoft.rest.v2.RestProxy;
import com.microsoft.rest.v2.SwaggerMethodParser;
import com.microsoft.rest.v2.RestProxy;
import com.microsoft.rest.v2.http.HttpMethod;
import com.microsoft.rest.v2.http.HttpRequest;
import com.microsoft.rest.v2.http.HttpResponse;
import io.reactivex.Single;
import io.reactivex.functions.Function;

import java.io.IOException;
import java.io.Serializable;
import java.net.MalformedURLException;
import java.net.URL;

Expand All @@ -23,12 +24,7 @@
* running operation.
*/
public final class AzureAsyncOperationPollStrategy extends PollStrategy {
private final URL operationResourceUrl;
private final URL originalResourceUrl;

private boolean pollingCompleted;
private boolean pollingSucceeded;
private boolean gotResourceResponse;
private AzureAsyncOperationPollStrategyData data;

/**
* The name of the header that indicates that a long running operation will use the
Expand All @@ -39,27 +35,55 @@ public final class AzureAsyncOperationPollStrategy extends PollStrategy {
/**
* Create a new AzureAsyncOperationPollStrategy object that will poll the provided operation
* resource URL.
* @param operationResourceUrl The URL of the operation resource this pollStrategy will poll.
* @param originalResourceUrl The URL of the resource that the long running operation is
* operating on.
* @param delayInMilliseconds The delay (in milliseconds) that the pollStrategy will use when
* polling.
* @param data The AzureAsyncOperationPollStrategyData data object.
*/
private AzureAsyncOperationPollStrategy(AzureAsyncOperationPollStrategyData data) {
super(data);
this.data = data;
}

/**
* The AzureAsyncOperationPollStrategy data.
*/
private AzureAsyncOperationPollStrategy(RestProxy restProxy, SwaggerMethodParser methodParser, URL operationResourceUrl, URL originalResourceUrl, long delayInMilliseconds) {
super(restProxy, methodParser, delayInMilliseconds);
private static class AzureAsyncOperationPollStrategyData extends PollStrategyData {
private boolean pollingCompleted;
private boolean pollingSucceeded;
private boolean gotResourceResponse;

URL operationResourceUrl;
URL originalResourceUrl;

/**
* Create a new AzureAsyncOperationPollStrategyData object that will poll the provided operation
* resource URL.
* @param operationResourceUrl The URL of the operation resource this pollStrategy will poll.
* @param originalResourceUrl The URL of the resource that the long running operation is
* operating on.
* @param delayInMilliseconds The delay (in milliseconds) that the pollStrategy will use when
* polling.
*/
AzureAsyncOperationPollStrategyData(RestProxy restProxy, SwaggerMethodParser methodParser, URL operationResourceUrl, URL originalResourceUrl, long delayInMilliseconds) {
super(restProxy, methodParser, delayInMilliseconds);
this.operationResourceUrl = operationResourceUrl;
this.originalResourceUrl = originalResourceUrl;
}

this.operationResourceUrl = operationResourceUrl;
this.originalResourceUrl = originalResourceUrl;
PollStrategy initializeStrategy(RestProxy restProxy,
SwaggerMethodParser methodParser) {
this.restProxy = restProxy;
this.methodParser = methodParser;
return new AzureAsyncOperationPollStrategy(this);
}
}

@Override
public HttpRequest createPollRequest() {
URL pollUrl;
if (!pollingCompleted) {
pollUrl = operationResourceUrl;
if (!data.pollingCompleted) {
pollUrl = data.operationResourceUrl;
}
else if (pollingSucceeded) {
pollUrl = originalResourceUrl;
else if (data.pollingSucceeded) {
pollUrl = data.originalResourceUrl;
} else {
throw new IllegalStateException("Polling is completed and did not succeed. Cannot create a polling request.");
}
Expand All @@ -76,7 +100,7 @@ public Single<HttpResponse> apply(HttpResponse response) {
updateDelayInMillisecondsFrom(response);

Single<HttpResponse> result;
if (!pollingCompleted) {
if (!data.pollingCompleted) {
final HttpResponse bufferedHttpPollResponse = response.buffer();
result = bufferedHttpPollResponse.bodyAsStringAsync()
.map(new Function<String, HttpResponse>() {
Expand All @@ -97,17 +121,17 @@ public HttpResponse apply(String bodyString) {
final String status = operationResource.status();
setStatus(status);

pollingCompleted = OperationState.isCompleted(status);
if (pollingCompleted) {
pollingSucceeded = OperationState.SUCCEEDED.equalsIgnoreCase(status);
data.pollingCompleted = OperationState.isCompleted(status);
if (data.pollingCompleted) {
data.pollingSucceeded = OperationState.SUCCEEDED.equalsIgnoreCase(status);
clearDelayInMilliseconds();

if (!pollingSucceeded) {
if (!data.pollingSucceeded) {
throw new CloudException("Async operation failed with provisioning state: " + status, bufferedHttpPollResponse);
}

if (operationResource.id() != null) {
gotResourceResponse = true;
data.gotResourceResponse = true;
}
}
}
Expand All @@ -117,8 +141,8 @@ public HttpResponse apply(String bodyString) {
});
}
else {
if (pollingSucceeded) {
gotResourceResponse = true;
if (data.pollingSucceeded) {
data.gotResourceResponse = true;
}

result = Single.just(response);
Expand All @@ -131,7 +155,7 @@ public HttpResponse apply(String bodyString) {

@Override
public boolean isDone() {
return pollingCompleted && (!pollingSucceeded || !expectsResourceResponse() || gotResourceResponse);
return data.pollingCompleted && (!data.pollingSucceeded || !expectsResourceResponse() || data.gotResourceResponse);
}

/**
Expand Down Expand Up @@ -159,11 +183,17 @@ static PollStrategy tryToCreate(RestProxy restProxy, SwaggerMethodParser methodP
}

return azureAsyncOperationUrl != null
? new AzureAsyncOperationPollStrategy(restProxy, methodParser, azureAsyncOperationUrl, originalHttpRequest.url(), delayInMilliseconds)
? new AzureAsyncOperationPollStrategy(
new AzureAsyncOperationPollStrategyData(restProxy, methodParser, azureAsyncOperationUrl, originalHttpRequest.url(), delayInMilliseconds))
: null;
}

static String getHeader(HttpResponse httpResponse) {
return httpResponse.headerValue(HEADER_NAME);
}

@Override
public Serializable strategyData() {
return this.data;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import com.microsoft.rest.v2.RestProxy;
import com.microsoft.rest.v2.SwaggerInterfaceParser;
import com.microsoft.rest.v2.SwaggerMethodParser;
import com.microsoft.rest.v2.OperationDescription;
import com.microsoft.rest.v2.credentials.ServiceClientCredentials;
import com.microsoft.rest.v2.http.HttpClient;
import com.microsoft.rest.v2.http.HttpMethod;
Expand Down Expand Up @@ -258,7 +259,8 @@ protected Object handleAsyncHttpResponse(final HttpRequest httpRequest, Single<H
@Override
public ObservableSource<?> apply(HttpResponse httpResponse) throws Exception {
final HttpResponse bufferedHttpResponse = httpResponse.buffer();
return createPollStrategy(httpRequest, Single.just(bufferedHttpResponse), methodParser).flatMapObservable(new Function<PollStrategy, ObservableSource<OperationStatus<?>>>() {
return createPollStrategy(httpRequest, Single.just(bufferedHttpResponse), methodParser)
.flatMapObservable(new Function<PollStrategy, ObservableSource<OperationStatus<?>>>() {
@Override
public ObservableSource<OperationStatus<?>> apply(final PollStrategy pollStrategy) throws Exception {
Observable<OperationStatus<?>> first = handleBodyReturnTypeAsync(bufferedHttpResponse, methodParser, operationStatusResultType)
Expand Down Expand Up @@ -295,6 +297,24 @@ public Single<HttpResponse> apply(PollStrategy pollStrategy) {
return result;
}

@Override
protected Object handleResumeOperation(final HttpRequest httpRequest,
OperationDescription operationDescription,
final SwaggerMethodParser methodParser,
Type returnType)
throws Exception {
final Type operationStatusType = ((ParameterizedType) returnType).getActualTypeArguments()[0];
final TypeToken operationStatusTypeToken = TypeToken.of(operationStatusType);
if (!operationStatusTypeToken.isSubtypeOf(OperationStatus.class)) {
throw new InvalidReturnTypeException("AzureProxy only supports swagger interface methods that return Observable (such as " + methodParser.fullyQualifiedMethodName() + "()) if the Observable's inner type that is OperationStatus (not " + returnType.toString() + ").");
}

PollStrategy.PollStrategyData pollStrategyData =
(PollStrategy.PollStrategyData) operationDescription.pollStrategyData();
PollStrategy pollStrategy = pollStrategyData.initializeStrategy(this, methodParser);
return pollStrategy.pollUntilDoneWithStatusUpdates(httpRequest, methodParser, operationStatusType);
}

private Single<PollStrategy> createPollStrategy(final HttpRequest originalHttpRequest, final Single<HttpResponse> asyncOriginalHttpResponse, final SwaggerMethodParser methodParser) {
return asyncOriginalHttpResponse
.flatMap(new Function<HttpResponse, Single<PollStrategy>>() {
Expand Down Expand Up @@ -349,7 +369,8 @@ else if (originalHttpRequestMethod == HttpMethod.PUT || originalHttpRequestMetho
}

if (pollStrategy == null && result == null) {
pollStrategy = new CompletedPollStrategy(AzureProxy.this, methodParser, originalHttpResponse);
pollStrategy = new CompletedPollStrategy(
new CompletedPollStrategy.CompletedPollStrategyData(AzureProxy.this, methodParser, originalHttpResponse));
}

if (pollStrategy != null) {
Expand All @@ -371,7 +392,8 @@ private Single<PollStrategy> createProvisioningStateOrCompletedPollStrategy(fina
|| httpRequestMethod == HttpMethod.GET
|| httpRequestMethod == HttpMethod.HEAD
|| !methodParser.expectsResponseBody()) {
result = Single.<PollStrategy>just(new CompletedPollStrategy(AzureProxy.this, methodParser, httpResponse));
result = Single.<PollStrategy>just(new CompletedPollStrategy(
new CompletedPollStrategy.CompletedPollStrategyData(AzureProxy.this, methodParser, httpResponse)));
} else {
final HttpResponse bufferedOriginalHttpResponse = httpResponse.buffer();
result = bufferedOriginalHttpResponse.bodyAsStringAsync()
Expand All @@ -387,9 +409,13 @@ public PollStrategy apply(String originalHttpResponseBody) {
final SerializerAdapter<?> serializer = serializer();
final ResourceWithProvisioningState resource = serializer.deserialize(originalHttpResponseBody, ResourceWithProvisioningState.class, SerializerEncoding.JSON);
if (resource != null && resource.properties() != null && !OperationState.isCompleted(resource.properties().provisioningState())) {
result = new ProvisioningStatePollStrategy(AzureProxy.this, methodParser, httpRequest, resource.properties().provisioningState(), delayInMilliseconds);
result = new ProvisioningStatePollStrategy(
new ProvisioningStatePollStrategy.ProvisioningStatePollStrategyData(
AzureProxy.this, methodParser, httpRequest, resource.properties().provisioningState(), delayInMilliseconds));
} else {
result = new CompletedPollStrategy(AzureProxy.this, methodParser, bufferedOriginalHttpResponse);
result = new CompletedPollStrategy(
new CompletedPollStrategy.CompletedPollStrategyData(
AzureProxy.this, methodParser, bufferedOriginalHttpResponse));
}
} catch (IOException e) {
throw Exceptions.propagate(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import io.reactivex.Observable;
import io.reactivex.Single;

import java.io.Serializable;
import java.lang.reflect.Type;

/**
Expand All @@ -22,20 +23,46 @@
*/
public class CompletedPollStrategy extends PollStrategy {
private final BufferedHttpResponse firstHttpResponse;
private CompletedPollStrategyData data;

/**
* Create a new CompletedPollStrategy.
* @param restProxy The RestProxy that created this PollStrategy.
* @param methodParser The method parser that describes the service interface method that
* initiated the long running operation.
* @param firstHttpResponse The HTTP response to the original HTTP request.
* @param data The poll strategy data.
*/
public CompletedPollStrategy(RestProxy restProxy, SwaggerMethodParser methodParser, HttpResponse firstHttpResponse) {
super(restProxy, methodParser, 0);
this.firstHttpResponse = firstHttpResponse.buffer();
public CompletedPollStrategy(CompletedPollStrategyData data) {
super(data);
this.firstHttpResponse = data.firstHttpResponse.buffer();
setStatus(OperationState.SUCCEEDED);
this.data = data;
}

/**
* The CompletedPollStrategy data.
*/
public static class CompletedPollStrategyData extends PollStrategyData {
HttpResponse firstHttpResponse;

/**
* Create a new CompletedPollStrategyData.
* @param restProxy The RestProxy that created this PollStrategy.
* @param methodParser The method parser that describes the service interface method that
* initiated the long running operation.
* @param firstHttpResponse The HTTP response to the original HTTP request.
*/
public CompletedPollStrategyData(RestProxy restProxy, SwaggerMethodParser methodParser, HttpResponse firstHttpResponse) {
super(restProxy, methodParser, 0);
this.firstHttpResponse = firstHttpResponse;
}

PollStrategy initializeStrategy(RestProxy restProxy,
SwaggerMethodParser methodParser) {
this.restProxy = restProxy;
this.methodParser = methodParser;
return new CompletedPollStrategy(this);
}
}

public
@Override
HttpRequest createPollRequest() {
throw new UnsupportedOperationException();
Expand All @@ -58,4 +85,9 @@ Observable<OperationStatus<Object>> pollUntilDoneWithStatusUpdates(final HttpReq
Single<HttpResponse> pollUntilDone() {
return Single.<HttpResponse>just(firstHttpResponse);
}

@Override
public Serializable strategyData() {
return this.data;
}
}
Loading

0 comments on commit 29b33eb

Please sign in to comment.