Skip to content

Commit

Permalink
Observable based async works for NSG
Browse files Browse the repository at this point in the history
  • Loading branch information
jianghaolu committed Aug 12, 2016
1 parent c039c6e commit 16c5acd
Show file tree
Hide file tree
Showing 6 changed files with 82 additions and 98 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@

package com.microsoft.azure;

import com.microsoft.rest.ServiceCall;
import com.microsoft.rest.ServiceCallback;
import rx.Observable;

/**
* Represents a group of related tasks.
Expand Down Expand Up @@ -60,10 +59,9 @@ public interface TaskGroup<T, U extends TaskItem<T>> {
/**
* Executes the tasks in the group asynchronously.
*
* @param callback the callback to call on failure or success
* @return the handle to the REST call
*/
ServiceCall executeAsync(ServiceCallback<T> callback);
Observable<T> executeAsync();

/**
* Gets the result of execution of a task in the group.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,18 @@

package com.microsoft.azure;

import com.microsoft.rest.ServiceCall;
import com.microsoft.rest.ServiceCallback;
import com.microsoft.rest.ServiceResponse;

import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.commons.lang3.tuple.MutablePair;
import rx.Observable;
import rx.functions.Action1;
import rx.functions.Func1;
import rx.functions.Func2;
import rx.functions.FuncN;
import rx.schedulers.Schedulers;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
* The base implementation of TaskGroup interface.
Expand All @@ -22,7 +29,6 @@
public abstract class TaskGroupBase<T, U extends TaskItem<T>>
implements TaskGroup<T, U> {
private DAGraph<U, DAGNode<U>> dag;
private ParallelServiceCall parallelServiceCall;

/**
* Creates TaskGroupBase.
Expand All @@ -32,7 +38,6 @@ public abstract class TaskGroupBase<T, U extends TaskItem<T>>
*/
public TaskGroupBase(String rootTaskItemId, U rootTaskItem) {
this.dag = new DAGraph<>(new DAGNode<>(rootTaskItemId, rootTaskItem));
this.parallelServiceCall = new ParallelServiceCall();
}

@Override
Expand Down Expand Up @@ -68,9 +73,8 @@ public void execute() throws Exception {
}

@Override
public ServiceCall<T> executeAsync(final ServiceCallback<T> callback) {
executeReadyTasksAsync(callback);
return parallelServiceCall;
public Observable<T> executeAsync() {
return executeReadyTasksAsync();
}

@Override
Expand All @@ -81,94 +85,39 @@ public T taskResult(String taskId) {
/**
* Executes all runnable tasks, a task is runnable when all the tasks its depends
* on are finished running.
*
* @param callback the callback
*/
private void executeReadyTasksAsync(final ServiceCallback<T> callback) {
private Observable<T> executeReadyTasksAsync() {
DAGNode<U> nextNode = dag.getNext();
Observable<T> rootObservable = null;
final List<Observable<T>> observables = new ArrayList<>();
while (nextNode != null) {
ServiceCall serviceCall = nextNode.data().executeAsync(taskCallback(nextNode, callback));
this.parallelServiceCall.addCall(serviceCall);
nextNode = dag.getNext();
}
}

/**
* This method create and return a callback for the runnable task stored in the given node.
* This callback wraps the given callback.
*
* @param taskNode the node containing runnable task
* @param callback the callback to wrap
* @return the task callback
*/
private ServiceCallback<T> taskCallback(final DAGNode<U> taskNode, final ServiceCallback<T> callback) {
final TaskGroupBase<T, U> self = this;
return new ServiceCallback<T>() {
@Override
public void failure(Throwable t) {
callback.failure(t);
parallelServiceCall.failure(t);
}

@Override
public void success(ServiceResponse<T> result) {
self.dag().reportedCompleted(taskNode);
if (self.dag().isRootNode(taskNode)) {
if (callback != null) {
callback.success(result);
}
parallelServiceCall.success(result);
} else {
self.executeReadyTasksAsync(callback);
}
}
};
}

/**
* Type represents a set of REST calls running possibly in parallel.
*/
private class ParallelServiceCall extends ServiceCall<T> {
private ConcurrentLinkedQueue<ServiceCall<?>> serviceCalls;

/**
* Creates a ParallelServiceCall.
*/
ParallelServiceCall() {
super(null);
this.serviceCalls = new ConcurrentLinkedQueue<>();
}

/**
* Cancels all the service calls currently executing.
*/
public void cancel() {
for (ServiceCall<?> call : this.serviceCalls) {
call.cancel(true);
final DAGNode<U> thisNode = nextNode;
if (dag().isRootNode(nextNode)) {
rootObservable = nextNode.data().executeAsync()
.doOnNext(new Action1<T>() {
@Override
public void call(T t) {
dag().reportedCompleted(thisNode);
}
});
} else {
Observable<T> nextNodeObservable = nextNode.data().executeAsync()
.flatMap(new Func1<T, Observable<T>>() {
@Override
public Observable<T> call(T t) {
dag().reportedCompleted(thisNode);
return executeReadyTasksAsync();
}
});
observables.add(nextNodeObservable);
}
nextNode = dag.getNext();
}

/**
* @return true if the call has been canceled; false otherwise.
*/
public boolean isCancelled() {
for (ServiceCall<?> call : this.serviceCalls) {
if (!call.isCancelled()) {
return false;
}
}
return true;
if (rootObservable != null) {
return rootObservable;
}

/**
* Add a call to the list of parallel calls.
*
* @param call the call
*/
private void addCall(ServiceCall call) {
if (call != null) {
this.serviceCalls.add(call);
}
else {
return Observable.merge(observables).last();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import com.microsoft.rest.ServiceCall;
import com.microsoft.rest.ServiceCallback;
import rx.Observable;

/**
* Type representing a task in a task group {@link TaskGroup}.
Expand All @@ -35,8 +36,7 @@ public interface TaskItem<U> {
* <p>
* once executed the result will be available through result getter
*
* @param callback callback to call on success or failure
* @return the handle of the REST call
*/
ServiceCall executeAsync(ServiceCallback<U> callback);
Observable<U> executeAsync();
}
4 changes: 4 additions & 0 deletions client-runtime/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
<groupId>io.reactivex</groupId>
<artifactId>rxjava</artifactId>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
Expand Down
28 changes: 28 additions & 0 deletions client-runtime/src/main/java/com/microsoft/rest/ServiceCall.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@
import com.google.common.util.concurrent.AbstractFuture;

import retrofit2.Call;
import rx.Observable;
import rx.Scheduler;
import rx.functions.Func1;

/**
* An instance of this class provides access to the underlying REST call invocation.
Expand Down Expand Up @@ -72,6 +75,31 @@ public boolean isCancelled() {
return call.isCanceled();
}

/**
* Get an RxJava Observable object for the response.
*
* @return the Observable
*/
public Observable<T> observable() {
return Observable.from(this)
.map(new Func1<ServiceResponse<T>, T>() {
@Override
public T call(ServiceResponse<T> tServiceResponse) {
return tServiceResponse.getBody();
}
});
}

/**
* Get an RxJava Observable object for the response bound on a scheduler.
*
* @param scheduler the scheduler to bind to
* @return the Observable
*/
public Observable<T> observable(Scheduler scheduler) {
return observable().subscribeOn(scheduler);
}

/**
* Invoke this method to report completed, allowing
* {@link AbstractFuture#get()} to be unblocked.
Expand Down
5 changes: 5 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,11 @@
<artifactId>adal</artifactId>
<version>1.1.11</version>
</dependency>
<dependency>
<groupId>io.reactivex</groupId>
<artifactId>rxjava</artifactId>
<version>1.1.8</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
Expand Down

0 comments on commit 16c5acd

Please sign in to comment.