Skip to content

Commit

Permalink
Squashed 'runtimes/' changes from ca25c1c..6c35000
Browse files Browse the repository at this point in the history
6c35000 Fix tests for fluent code
a1cd0f1 Paging async works
148e463 Generate correct paging code
57cbe08 LRO tests pass
bae30e0 Regenerate fluent
0882500 Merge commit '6eb93f97e4e2faab3b8e56090fc697c32dd2f410' into rx
2e44bb0 User observable for paging
0fffe57 Merge 9925f2c into da82c36
b7c1094 Remove unused imports
b19b716 LRO now uses Rx
75a68c3 Generate Observable based clients in generic generator
0a7e455 Create VM works
5bb56b9 Fixing the javadoc error and formatting errors for key vault
16c5acd Observable based async works for NSG
c039c6e Merge pull request Azure#1015 from anuchandy/resources-create
44a1519 Merge branch 'master' of github.com:Azure/azure-sdk-for-java into resources-create
652d622 Exposing Create and CreateAsync in collection that takes variable number of Creatables

git-subtree-dir: runtimes
git-subtree-split: 6c35000
  • Loading branch information
jianghaolu committed Aug 26, 2016
1 parent 28bc124 commit 65d59d6
Show file tree
Hide file tree
Showing 18 changed files with 569 additions and 958 deletions.
1,015 changes: 320 additions & 695 deletions azure-client-runtime/src/main/java/com/microsoft/azure/AzureClient.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/**
*
* Copyright (c) Microsoft Corporation. All rights reserved.
* Licensed under the MIT License. See License.txt in the project root for license information.
*
*/

package com.microsoft.azure;

import com.microsoft.rest.ServiceCall;
import com.microsoft.rest.ServiceResponse;

import rx.Observable;
import rx.Subscriber;
import rx.functions.Func1;

/**
* An instance of this class provides access to the underlying REST call invocation.
* This class wraps around the Retrofit Call object and allows updates to it in the
* progress of a long running operation or a paging operation.
*
* @param <T> the type of the returning object
*/
public final class AzureServiceCall<T> extends ServiceCall<T> {
private AzureServiceCall() {
}

/**
* Creates a ServiceCall from a paging operation.
*
* @param first the observable to the first page
* @param next the observable to poll subsequent pages
* @param callback the client-side callback
* @param <T> the Page type
* @param <V> the element type
* @return the future based ServiceCall
*/
public static <T extends Page<V>, V> ServiceCall<T> create(Observable<ServiceResponse<T>> first, final Func1<String, Observable<ServiceResponse<T>>> next, final ListOperationCallback<V> callback) {
final AzureServiceCall<T> serviceCall = new AzureServiceCall<>();
final PagingSubscriber<T, V> subscriber = new PagingSubscriber<>(serviceCall, next, callback);
serviceCall.setSubscription(first
.single()
.subscribe(subscriber));
return serviceCall;
}

/**
* The subscriber that handles user callback and automatically subscribes to the next page.
*
* @param <T> the Page type
* @param <V> the element type
*/
private static class PagingSubscriber<T extends Page<V>, V> extends Subscriber<ServiceResponse<T>> {
private AzureServiceCall<T> serviceCall;
private Func1<String, Observable<ServiceResponse<T>>> next;
private ListOperationCallback<V> callback;
private ServiceResponse<T> lastResponse;

PagingSubscriber(final AzureServiceCall<T> serviceCall, final Func1<String, Observable<ServiceResponse<T>>> next, final ListOperationCallback<V> callback) {
this.serviceCall = serviceCall;
this.next = next;
this.callback = callback;
}

@Override
public void onCompleted() {
// do nothing
}

@Override
public void onError(Throwable e) {
serviceCall.setException(e);
if (callback != null) {
callback.failure(e);
}
}

@Override
public void onNext(ServiceResponse<T> serviceResponse) {
lastResponse = serviceResponse;
ListOperationCallback.PagingBehavior behavior = ListOperationCallback.PagingBehavior.CONTINUE;
if (callback != null) {
behavior = callback.progress(serviceResponse.getBody().getItems());
if (behavior == ListOperationCallback.PagingBehavior.STOP || serviceResponse.getBody().getNextPageLink() == null) {
callback.success();
}
}
if (behavior == ListOperationCallback.PagingBehavior.STOP || serviceResponse.getBody().getNextPageLink() == null) {
serviceCall.set(lastResponse);
} else {
serviceCall.setSubscription(next.call(serviceResponse.getBody().getNextPageLink()).single().subscribe(this));
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
package com.microsoft.azure;

import com.microsoft.rest.ServiceCallback;
import com.microsoft.rest.ServiceResponse;

import java.util.List;

Expand Down Expand Up @@ -36,16 +37,14 @@ public ListOperationCallback() {

/**
* Override this method to handle progressive results.
* The user is responsible for returning a {@link PagingBahavior} Enum to indicate
* The user is responsible for returning a {@link PagingBehavior} Enum to indicate
* whether the client should continue loading or stop.
*
* @param partial the list of resources from the current request.
* @return CONTINUE if you want to go on loading, STOP otherwise.
*
*/
public PagingBahavior progress(List<E> partial) {
return PagingBahavior.CONTINUE;
}
public abstract PagingBehavior progress(List<E> partial);

/**
* Get the list result that stores the accumulated resources loaded from server.
Expand All @@ -71,6 +70,16 @@ public void load(List<E> result) {
}
}

@Override
public void success(ServiceResponse<List<E>> result) {
success();
}

/**
* Override this method to handle successful REST call results.
*/
public abstract void success();

/**
* Get the number of loaded pages.
*
Expand All @@ -83,7 +92,7 @@ public int pageCount() {
/**
* An enum to indicate whether the client should continue loading or stop.
*/
public enum PagingBahavior {
public enum PagingBehavior {
/**
* Indicates that the client should continue loading.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@

import com.microsoft.rest.RestException;

import javax.xml.bind.DataBindingException;
import javax.xml.ws.WebServiceException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
Expand All @@ -19,6 +17,8 @@
import java.util.ListIterator;
import java.util.NoSuchElementException;

import javax.xml.bind.DataBindingException;

/**
* Defines a list response from a paging operation. The pages are
* lazy initialized when an instance of this class is iterated.
Expand Down Expand Up @@ -81,8 +81,6 @@ public void loadNextPage() {
this.nextPageLink = nextPage.getNextPageLink();
this.items.addAll(nextPage.getItems());
this.currentPage = nextPage;
} catch (RestException e) {
throw new WebServiceException(e.toString(), e);
} catch (IOException e) {
throw new DataBindingException(e.getMessage(), e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public void updateFromResponseOnPutPatch(Response<ResponseBody> response) throws
}

if (responseContent == null || responseContent.isEmpty()) {
CloudException exception = new CloudException("no body");
CloudException exception = new CloudException("polling response does not contain a valid body");
exception.setResponse(response);
throw exception;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import okhttp3.OkHttpClient;
import okhttp3.logging.HttpLoggingInterceptor;
import retrofit2.Retrofit;
import retrofit2.adapter.rxjava.RxJavaCallAdapterFactory;

import java.lang.reflect.Field;
import java.net.CookieManager;
Expand Down Expand Up @@ -338,6 +339,7 @@ public RestClient build() {
.baseUrl(baseUrl)
.client(httpClient)
.addConverterFactory(mapperAdapter.getConverterFactory())
.addCallAdapterFactory(RxJavaCallAdapterFactory.create())
.build(),
credentials,
customHeadersInterceptor,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@

package com.microsoft.azure;

import com.microsoft.rest.ServiceCall;
import com.microsoft.rest.ServiceCallback;
import rx.Observable;

/**
* Represents a group of related tasks.
Expand Down Expand Up @@ -60,10 +59,9 @@ public interface TaskGroup<T, U extends TaskItem<T>> {
/**
* Executes the tasks in the group asynchronously.
*
* @param callback the callback to call on failure or success
* @return the handle to the REST call
*/
ServiceCall executeAsync(ServiceCallback<T> callback);
Observable<T> executeAsync();

/**
* Gets the result of execution of a task in the group.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@

package com.microsoft.azure;

import com.microsoft.rest.ServiceCall;
import com.microsoft.rest.ServiceCallback;
import com.microsoft.rest.ServiceResponse;
import rx.Observable;
import rx.functions.Func1;

import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.ArrayList;
import java.util.List;

/**
* The base implementation of TaskGroup interface.
Expand All @@ -22,7 +22,6 @@
public abstract class TaskGroupBase<T, U extends TaskItem<T>>
implements TaskGroup<T, U> {
private DAGraph<U, DAGNode<U>> dag;
private ParallelServiceCall parallelServiceCall;

/**
* Creates TaskGroupBase.
Expand All @@ -32,7 +31,6 @@ public abstract class TaskGroupBase<T, U extends TaskItem<T>>
*/
public TaskGroupBase(String rootTaskItemId, U rootTaskItem) {
this.dag = new DAGraph<>(new DAGNode<>(rootTaskItemId, rootTaskItem));
this.parallelServiceCall = new ParallelServiceCall();
}

@Override
Expand Down Expand Up @@ -68,9 +66,8 @@ public void execute() throws Exception {
}

@Override
public ServiceCall<T> executeAsync(final ServiceCallback<T> callback) {
executeReadyTasksAsync(callback);
return parallelServiceCall;
public Observable<T> executeAsync() {
return executeReadyTasksAsync();
}

@Override
Expand All @@ -81,92 +78,26 @@ public T taskResult(String taskId) {
/**
* Executes all runnable tasks, a task is runnable when all the tasks its depends
* on are finished running.
*
* @param callback the callback
*/
private void executeReadyTasksAsync(final ServiceCallback<T> callback) {
private Observable<T> executeReadyTasksAsync() {
DAGNode<U> nextNode = dag.getNext();
final List<Observable<T>> observables = new ArrayList<>();
while (nextNode != null) {
ServiceCall serviceCall = nextNode.data().executeAsync(taskCallback(nextNode, callback));
this.parallelServiceCall.addCall(serviceCall);
final DAGNode<U> thisNode = nextNode;
observables.add(nextNode.data().executeAsync()
.flatMap(new Func1<T, Observable<T>>() {
@Override
public Observable<T> call(T t) {
dag().reportedCompleted(thisNode);
if (dag().isRootNode(thisNode)) {
return Observable.just(t);
} else {
return executeReadyTasksAsync();
}
}
}));
nextNode = dag.getNext();
}
}

/**
* This method create and return a callback for the runnable task stored in the given node.
* This callback wraps the given callback.
*
* @param taskNode the node containing runnable task
* @param callback the callback to wrap
* @return the task callback
*/
private ServiceCallback<T> taskCallback(final DAGNode<U> taskNode, final ServiceCallback<T> callback) {
final TaskGroupBase<T, U> self = this;
return new ServiceCallback<T>() {
@Override
public void failure(Throwable t) {
callback.failure(t);
parallelServiceCall.failure(t);
}

@Override
public void success(ServiceResponse<T> result) {
self.dag().reportedCompleted(taskNode);
if (self.dag().isRootNode(taskNode)) {
if (callback != null) {
callback.success(result);
}
parallelServiceCall.success(result);
} else {
self.executeReadyTasksAsync(callback);
}
}
};
}

/**
* Type represents a set of REST calls running possibly in parallel.
*/
private class ParallelServiceCall extends ServiceCall<T> {
private ConcurrentLinkedQueue<ServiceCall<?>> serviceCalls;

/**
* Creates a ParallelServiceCall.
*/
ParallelServiceCall() {
super(null);
this.serviceCalls = new ConcurrentLinkedQueue<>();
}

/**
* Cancels all the service calls currently executing.
*/
public void cancel() {
for (ServiceCall<?> call : this.serviceCalls) {
call.cancel(true);
}
}

/**
* @return true if the call has been canceled; false otherwise.
*/
public boolean isCancelled() {
for (ServiceCall<?> call : this.serviceCalls) {
if (!call.isCancelled()) {
return false;
}
}
return true;
}

/**
* Add a call to the list of parallel calls.
*
* @param call the call
*/
private void addCall(ServiceCall<?> call) {
this.serviceCalls.add(call);
}
return Observable.merge(observables).last();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@

package com.microsoft.azure;

import com.microsoft.rest.ServiceCall;
import com.microsoft.rest.ServiceCallback;
import rx.Observable;

/**
* Type representing a task in a task group {@link TaskGroup}.
Expand All @@ -35,8 +34,7 @@ public interface TaskItem<U> {
* <p>
* once executed the result will be available through result getter
*
* @param callback callback to call on success or failure
* @return the handle of the REST call
*/
ServiceCall executeAsync(ServiceCallback<U> callback);
Observable<U> executeAsync();
}
Loading

0 comments on commit 65d59d6

Please sign in to comment.