From 16c5acde66b57fd641afcde6af1234ae804701fe Mon Sep 17 00:00:00 2001 From: Jianghao Lu Date: Fri, 12 Aug 2016 00:37:34 -0700 Subject: [PATCH] Observable based async works for NSG --- .../java/com/microsoft/azure/TaskGroup.java | 6 +- .../com/microsoft/azure/TaskGroupBase.java | 133 ++++++------------ .../java/com/microsoft/azure/TaskItem.java | 4 +- client-runtime/pom.xml | 4 + .../java/com/microsoft/rest/ServiceCall.java | 28 ++++ pom.xml | 5 + 6 files changed, 82 insertions(+), 98 deletions(-) diff --git a/azure-client-runtime/src/main/java/com/microsoft/azure/TaskGroup.java b/azure-client-runtime/src/main/java/com/microsoft/azure/TaskGroup.java index 3ad514d93874a..75722a494feac 100644 --- a/azure-client-runtime/src/main/java/com/microsoft/azure/TaskGroup.java +++ b/azure-client-runtime/src/main/java/com/microsoft/azure/TaskGroup.java @@ -7,8 +7,7 @@ package com.microsoft.azure; -import com.microsoft.rest.ServiceCall; -import com.microsoft.rest.ServiceCallback; +import rx.Observable; /** * Represents a group of related tasks. @@ -60,10 +59,9 @@ public interface TaskGroup> { /** * Executes the tasks in the group asynchronously. * - * @param callback the callback to call on failure or success * @return the handle to the REST call */ - ServiceCall executeAsync(ServiceCallback callback); + Observable executeAsync(); /** * Gets the result of execution of a task in the group. diff --git a/azure-client-runtime/src/main/java/com/microsoft/azure/TaskGroupBase.java b/azure-client-runtime/src/main/java/com/microsoft/azure/TaskGroupBase.java index 8f178480c43be..108e8b6625b49 100644 --- a/azure-client-runtime/src/main/java/com/microsoft/azure/TaskGroupBase.java +++ b/azure-client-runtime/src/main/java/com/microsoft/azure/TaskGroupBase.java @@ -7,11 +7,18 @@ package com.microsoft.azure; -import com.microsoft.rest.ServiceCall; -import com.microsoft.rest.ServiceCallback; -import com.microsoft.rest.ServiceResponse; - -import java.util.concurrent.ConcurrentLinkedQueue; +import org.apache.commons.lang3.tuple.MutablePair; +import rx.Observable; +import rx.functions.Action1; +import rx.functions.Func1; +import rx.functions.Func2; +import rx.functions.FuncN; +import rx.schedulers.Schedulers; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; /** * The base implementation of TaskGroup interface. @@ -22,7 +29,6 @@ public abstract class TaskGroupBase> implements TaskGroup { private DAGraph> dag; - private ParallelServiceCall parallelServiceCall; /** * Creates TaskGroupBase. @@ -32,7 +38,6 @@ public abstract class TaskGroupBase> */ public TaskGroupBase(String rootTaskItemId, U rootTaskItem) { this.dag = new DAGraph<>(new DAGNode<>(rootTaskItemId, rootTaskItem)); - this.parallelServiceCall = new ParallelServiceCall(); } @Override @@ -68,9 +73,8 @@ public void execute() throws Exception { } @Override - public ServiceCall executeAsync(final ServiceCallback callback) { - executeReadyTasksAsync(callback); - return parallelServiceCall; + public Observable executeAsync() { + return executeReadyTasksAsync(); } @Override @@ -81,94 +85,39 @@ public T taskResult(String taskId) { /** * Executes all runnable tasks, a task is runnable when all the tasks its depends * on are finished running. - * - * @param callback the callback */ - private void executeReadyTasksAsync(final ServiceCallback callback) { + private Observable executeReadyTasksAsync() { DAGNode nextNode = dag.getNext(); + Observable rootObservable = null; + final List> observables = new ArrayList<>(); while (nextNode != null) { - ServiceCall serviceCall = nextNode.data().executeAsync(taskCallback(nextNode, callback)); - this.parallelServiceCall.addCall(serviceCall); - nextNode = dag.getNext(); - } - } - - /** - * This method create and return a callback for the runnable task stored in the given node. - * This callback wraps the given callback. - * - * @param taskNode the node containing runnable task - * @param callback the callback to wrap - * @return the task callback - */ - private ServiceCallback taskCallback(final DAGNode taskNode, final ServiceCallback callback) { - final TaskGroupBase self = this; - return new ServiceCallback() { - @Override - public void failure(Throwable t) { - callback.failure(t); - parallelServiceCall.failure(t); - } - - @Override - public void success(ServiceResponse result) { - self.dag().reportedCompleted(taskNode); - if (self.dag().isRootNode(taskNode)) { - if (callback != null) { - callback.success(result); - } - parallelServiceCall.success(result); - } else { - self.executeReadyTasksAsync(callback); - } - } - }; - } - - /** - * Type represents a set of REST calls running possibly in parallel. - */ - private class ParallelServiceCall extends ServiceCall { - private ConcurrentLinkedQueue> serviceCalls; - - /** - * Creates a ParallelServiceCall. - */ - ParallelServiceCall() { - super(null); - this.serviceCalls = new ConcurrentLinkedQueue<>(); - } - - /** - * Cancels all the service calls currently executing. - */ - public void cancel() { - for (ServiceCall call : this.serviceCalls) { - call.cancel(true); + final DAGNode thisNode = nextNode; + if (dag().isRootNode(nextNode)) { + rootObservable = nextNode.data().executeAsync() + .doOnNext(new Action1() { + @Override + public void call(T t) { + dag().reportedCompleted(thisNode); + } + }); + } else { + Observable nextNodeObservable = nextNode.data().executeAsync() + .flatMap(new Func1>() { + @Override + public Observable call(T t) { + dag().reportedCompleted(thisNode); + return executeReadyTasksAsync(); + } + }); + observables.add(nextNodeObservable); } + nextNode = dag.getNext(); } - - /** - * @return true if the call has been canceled; false otherwise. - */ - public boolean isCancelled() { - for (ServiceCall call : this.serviceCalls) { - if (!call.isCancelled()) { - return false; - } - } - return true; + if (rootObservable != null) { + return rootObservable; } - - /** - * Add a call to the list of parallel calls. - * - * @param call the call - */ - private void addCall(ServiceCall call) { - if (call != null) { - this.serviceCalls.add(call); - } + else { + return Observable.merge(observables).last(); } } } diff --git a/azure-client-runtime/src/main/java/com/microsoft/azure/TaskItem.java b/azure-client-runtime/src/main/java/com/microsoft/azure/TaskItem.java index 8f0a3459a2e92..48c4e312d9107 100644 --- a/azure-client-runtime/src/main/java/com/microsoft/azure/TaskItem.java +++ b/azure-client-runtime/src/main/java/com/microsoft/azure/TaskItem.java @@ -9,6 +9,7 @@ import com.microsoft.rest.ServiceCall; import com.microsoft.rest.ServiceCallback; +import rx.Observable; /** * Type representing a task in a task group {@link TaskGroup}. @@ -35,8 +36,7 @@ public interface TaskItem { *

* once executed the result will be available through result getter * - * @param callback callback to call on success or failure * @return the handle of the REST call */ - ServiceCall executeAsync(ServiceCallback callback); + Observable executeAsync(); } diff --git a/client-runtime/pom.xml b/client-runtime/pom.xml index e63a421434b38..d904227d91514 100644 --- a/client-runtime/pom.xml +++ b/client-runtime/pom.xml @@ -79,6 +79,10 @@ org.apache.commons commons-lang3 + + io.reactivex + rxjava + junit junit diff --git a/client-runtime/src/main/java/com/microsoft/rest/ServiceCall.java b/client-runtime/src/main/java/com/microsoft/rest/ServiceCall.java index c0cb3b62ebddd..1baf40cbd9718 100644 --- a/client-runtime/src/main/java/com/microsoft/rest/ServiceCall.java +++ b/client-runtime/src/main/java/com/microsoft/rest/ServiceCall.java @@ -10,6 +10,9 @@ import com.google.common.util.concurrent.AbstractFuture; import retrofit2.Call; +import rx.Observable; +import rx.Scheduler; +import rx.functions.Func1; /** * An instance of this class provides access to the underlying REST call invocation. @@ -72,6 +75,31 @@ public boolean isCancelled() { return call.isCanceled(); } + /** + * Get an RxJava Observable object for the response. + * + * @return the Observable + */ + public Observable observable() { + return Observable.from(this) + .map(new Func1, T>() { + @Override + public T call(ServiceResponse tServiceResponse) { + return tServiceResponse.getBody(); + } + }); + } + + /** + * Get an RxJava Observable object for the response bound on a scheduler. + * + * @param scheduler the scheduler to bind to + * @return the Observable + */ + public Observable observable(Scheduler scheduler) { + return observable().subscribeOn(scheduler); + } + /** * Invoke this method to report completed, allowing * {@link AbstractFuture#get()} to be unblocked. diff --git a/pom.xml b/pom.xml index aa77260c5893f..082e2e5bfeb8e 100644 --- a/pom.xml +++ b/pom.xml @@ -121,6 +121,11 @@ adal 1.1.11 + + io.reactivex + rxjava + 1.1.8 + junit junit