Skip to content

Commit

Permalink
Fixing the issue of circular depedency due to the use of non-unique key
Browse files Browse the repository at this point in the history
  • Loading branch information
anuchandy committed Jul 26, 2016
1 parent afc4173 commit 6625ef4
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

package com.microsoft.azure;

import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
Expand Down Expand Up @@ -51,6 +52,13 @@ public void addNode(U node) {
graph.put(node.key(), node);
}

/**
* @return all nodes in the graph.
*/
public Collection<U> getNodes() {
return graph.values();
}

/**
* Perform DFS visit in this graph.
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@
* The base implementation of TaskGroup interface.
*
* @param <T> the result type of the tasks in the group
* @param <U> the task item
*/
public abstract class TaskGroupBase<T>
implements TaskGroup<T, TaskItem<T>> {
private DAGraph<TaskItem<T>, DAGNode<TaskItem<T>>> dag;
public abstract class TaskGroupBase<T, U extends TaskItem<T>>
implements TaskGroup<T, U> {
private DAGraph<U, DAGNode<U>> dag;
private ParallelServiceCall parallelServiceCall;

/**
Expand All @@ -29,13 +30,13 @@ public abstract class TaskGroupBase<T>
* @param rootTaskItemId the id of the root task in this task group
* @param rootTaskItem the root task
*/
public TaskGroupBase(String rootTaskItemId, TaskItem<T> rootTaskItem) {
public TaskGroupBase(String rootTaskItemId, U rootTaskItem) {
this.dag = new DAGraph<>(new DAGNode<>(rootTaskItemId, rootTaskItem));
this.parallelServiceCall = new ParallelServiceCall();
}

@Override
public DAGraph<TaskItem<T>, DAGNode<TaskItem<T>>> dag() {
public DAGraph<U, DAGNode<U>> dag() {
return dag;
}

Expand All @@ -45,7 +46,7 @@ public boolean isPreparer() {
}

@Override
public void merge(TaskGroup<T, TaskItem<T>> parentTaskGroup) {
public void merge(TaskGroup<T, U> parentTaskGroup) {
dag.merge(parentTaskGroup.dag());
}

Expand All @@ -58,7 +59,7 @@ public void prepare() {

@Override
public void execute() throws Exception {
DAGNode<TaskItem<T>> nextNode = dag.getNext();
DAGNode<U> nextNode = dag.getNext();
while (nextNode != null) {
nextNode.data().execute();
this.dag().reportedCompleted(nextNode);
Expand All @@ -84,7 +85,7 @@ public T taskResult(String taskId) {
* @param callback the callback
*/
private void executeReadyTasksAsync(final ServiceCallback<T> callback) {
DAGNode<TaskItem<T>> nextNode = dag.getNext();
DAGNode<U> nextNode = dag.getNext();
while (nextNode != null) {
ServiceCall serviceCall = nextNode.data().executeAsync(taskCallback(nextNode, callback));
this.parallelServiceCall.addCall(serviceCall);
Expand All @@ -100,8 +101,8 @@ private void executeReadyTasksAsync(final ServiceCallback<T> callback) {
* @param callback the callback to wrap
* @return the task callback
*/
private ServiceCallback<T> taskCallback(final DAGNode<TaskItem<T>> taskNode, final ServiceCallback<T> callback) {
final TaskGroupBase<T> self = this;
private ServiceCallback<T> taskCallback(final DAGNode<U> taskNode, final ServiceCallback<T> callback) {
final TaskGroupBase<T, U> self = this;
return new ServiceCallback<T>() {
@Override
public void failure(Throwable t) {
Expand Down

0 comments on commit 6625ef4

Please sign in to comment.