Skip to content

Commit

Permalink
merge async changes in 3.x to 2.7 (#3997)
Browse files Browse the repository at this point in the history
  • Loading branch information
chickenlj authored and cvictory committed May 23, 2019
1 parent df084ad commit ebafc18
Show file tree
Hide file tree
Showing 130 changed files with 1,797 additions and 1,639 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@
import org.apache.dubbo.common.timer.Timer;
import org.apache.dubbo.common.timer.TimerTask;
import org.apache.dubbo.common.utils.NamedThreadFactory;
import org.apache.dubbo.rpc.AsyncRpcResult;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.Result;
import org.apache.dubbo.rpc.RpcException;
import org.apache.dubbo.rpc.RpcResult;
import org.apache.dubbo.rpc.cluster.Directory;
import org.apache.dubbo.rpc.cluster.LoadBalance;

Expand Down Expand Up @@ -103,7 +103,7 @@ protected Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, Load
logger.error("Failback to invoke method " + invocation.getMethodName() + ", wait for retry in background. Ignored exception: "
+ e.getMessage() + ", ", e);
addFailed(loadbalance, invocation, invokers, invoker);
return new RpcResult(); // ignore
return AsyncRpcResult.newDefaultAsyncResult(null, null, invocation); // ignore
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,18 @@

import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.rpc.AsyncRpcResult;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.Result;
import org.apache.dubbo.rpc.RpcException;
import org.apache.dubbo.rpc.RpcResult;
import org.apache.dubbo.rpc.cluster.Directory;
import org.apache.dubbo.rpc.cluster.LoadBalance;

import java.util.List;

/**
* When invoke fails, log the error message and ignore this error by returning an empty RpcResult.
* When invoke fails, log the error message and ignore this error by returning an empty Result.
* Usually used to write audit logs and other operations
*
* <a href="http://en.wikipedia.org/wiki/Fail-safe">Fail-safe</a>
Expand All @@ -50,7 +50,7 @@ public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBal
return invoker.invoke(invocation);
} catch (Throwable e) {
logger.error("Failsafe ignore exception: " + e.getMessage(), e);
return new RpcResult(); // ignore
return AsyncRpcResult.newDefaultAsyncResult(null, null, invocation); // ignore
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@
import static org.apache.dubbo.common.constants.CommonConstants.TIMEOUT_KEY;

/**
* NOTICE! This implementation does not work well with async call.
*
* Invoke a specific number of invokers concurrently, usually used for demanding real-time operations, but need to waste more service resources.
*
* <a href="http://en.wikipedia.org/wiki/Fork_(topology)">Fork</a>
Expand Down Expand Up @@ -70,7 +72,6 @@ public Result doInvoke(final Invocation invocation, List<Invoker<T>> invokers, L
} else {
selected = new ArrayList<>();
for (int i = 0; i < forks; i++) {
// TODO. Add some comment here, refer chinese version for more details.
Invoker<T> invoker = select(loadbalance, invocation, invokers, selected);
if (!selected.contains(invoker)) {
//Avoid add the same invoker several times.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.utils.ConfigUtils;
import org.apache.dubbo.common.utils.NamedThreadFactory;
import org.apache.dubbo.rpc.AsyncRpcResult;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.Result;
import org.apache.dubbo.rpc.RpcException;
import org.apache.dubbo.rpc.RpcInvocation;
import org.apache.dubbo.rpc.RpcResult;
import org.apache.dubbo.rpc.cluster.Directory;
import org.apache.dubbo.rpc.cluster.LoadBalance;
import org.apache.dubbo.rpc.cluster.Merger;
Expand All @@ -40,17 +40,16 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

import static org.apache.dubbo.common.constants.CommonConstants.DEFAULT_TIMEOUT;
import static org.apache.dubbo.common.constants.CommonConstants.GROUP_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.TIMEOUT_KEY;
import static org.apache.dubbo.rpc.Constants.ASYNC_KEY;
import static org.apache.dubbo.rpc.Constants.MERGER_KEY;

/**
* @param <T>
*/
@SuppressWarnings("unchecked")
public class MergeableClusterInvoker<T> extends AbstractClusterInvoker<T> {

Expand Down Expand Up @@ -90,26 +89,21 @@ protected Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, Load
returnType = null;
}

Map<String, Future<Result>> results = new HashMap<String, Future<Result>>();
Map<String, Result> results = new HashMap<>();
for (final Invoker<T> invoker : invokers) {
Future<Result> future = executor.submit(new Callable<Result>() {
@Override
public Result call() throws Exception {
return invoker.invoke(new RpcInvocation(invocation, invoker));
}
});
results.put(invoker.getUrl().getServiceKey(), future);
RpcInvocation subInvocation = new RpcInvocation(invocation, invoker);
subInvocation.setAttachment(ASYNC_KEY, "true");
results.put(invoker.getUrl().getServiceKey(), invoker.invoke(subInvocation));
}

Object result = null;

List<Result> resultList = new ArrayList<Result>(results.size());

int timeout = getUrl().getMethodParameter(invocation.getMethodName(), TIMEOUT_KEY, DEFAULT_TIMEOUT);
for (Map.Entry<String, Future<Result>> entry : results.entrySet()) {
Future<Result> future = entry.getValue();
for (Map.Entry<String, Result> entry : results.entrySet()) {
Result asyncResult = entry.getValue();
try {
Result r = future.get(timeout, TimeUnit.MILLISECONDS);
Result r = asyncResult.get();
if (r.hasException()) {
log.error("Invoke " + getGroupDescFromServiceKey(entry.getKey()) +
" failed: " + r.getException().getMessage(),
Expand All @@ -123,13 +117,13 @@ public Result call() throws Exception {
}

if (resultList.isEmpty()) {
return new RpcResult((Object) null);
return AsyncRpcResult.newDefaultAsyncResult(invocation);
} else if (resultList.size() == 1) {
return resultList.iterator().next();
}

if (returnType == void.class) {
return new RpcResult((Object) null);
return AsyncRpcResult.newDefaultAsyncResult(invocation);
}

if (merger.startsWith(".")) {
Expand Down Expand Up @@ -177,7 +171,7 @@ public Result call() throws Exception {
throw new RpcException("There is no merger to merge result.");
}
}
return new RpcResult(result);
return AsyncRpcResult.newDefaultAsyncResult(result, invocation);
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.utils.CollectionUtils;
import org.apache.dubbo.common.utils.StringUtils;
import org.apache.dubbo.rpc.AsyncRpcResult;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.Result;
import org.apache.dubbo.rpc.RpcException;
import org.apache.dubbo.rpc.RpcInvocation;
import org.apache.dubbo.rpc.RpcResult;
import org.apache.dubbo.rpc.cluster.Directory;
import org.apache.dubbo.rpc.support.MockInvoker;

Expand Down Expand Up @@ -115,7 +115,7 @@ private Result doMockInvoke(Invocation invocation, RpcException e) {
result = minvoker.invoke(invocation);
} catch (RpcException me) {
if (me.isBiz()) {
result = new RpcResult(me.getCause());
result = AsyncRpcResult.newDefaultAsyncResult(me.getCause(), invocation);
} else {
throw new RpcException(me.getCode(), getMockExceptionMessage(e, me), me.getCause());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@

import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.extension.ExtensionLoader;
import org.apache.dubbo.rpc.AppResponse;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.Result;
import org.apache.dubbo.rpc.RpcException;
import org.apache.dubbo.rpc.RpcInvocation;
import org.apache.dubbo.rpc.RpcResult;
import org.apache.dubbo.rpc.cluster.support.AbstractClusterInvoker;

import org.junit.jupiter.api.Assertions;
Expand All @@ -48,7 +48,7 @@ public class StickyTest {
private Invoker<StickyTest> invoker2 = mock(Invoker.class);
private RpcInvocation invocation;
private Directory<StickyTest> dic;
private Result result = new RpcResult();
private Result result = new AppResponse();
private StickyClusterInvoker<StickyTest> clusterinvoker = null;
private URL url = URL.valueOf("test://test:11/test?"
+ "&loadbalance=roundrobin"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,16 @@ public Map<String, String> getAttachments() {
return attachments;
}

@Override
public void setAttachment(String key, String value) {

}

@Override
public void setAttachmentIfAbsent(String key, String value) {

}

public Invoker<?> getInvoker() {
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@

import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.extension.ExtensionLoader;
import org.apache.dubbo.rpc.AppResponse;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.Result;
import org.apache.dubbo.rpc.RpcException;
import org.apache.dubbo.rpc.RpcInvocation;
import org.apache.dubbo.rpc.RpcResult;
import org.apache.dubbo.rpc.cluster.Directory;
import org.apache.dubbo.rpc.cluster.LoadBalance;
import org.apache.dubbo.rpc.cluster.RouterFactory;
Expand Down Expand Up @@ -52,7 +52,7 @@ public class FileRouterEngineTest {
Invoker<FileRouterEngineTest> invoker2 = mock(Invoker.class);
Invocation invocation;
StaticDirectory<FileRouterEngineTest> dic;
Result result = new RpcResult();
Result result = new AppResponse();
private RouterFactory routerFactory = ExtensionLoader.getExtensionLoader(RouterFactory.class).getAdaptiveExtension();

@BeforeAll
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@

import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.utils.LogUtil;
import org.apache.dubbo.rpc.AppResponse;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.Result;
import org.apache.dubbo.rpc.RpcContext;
import org.apache.dubbo.rpc.RpcInvocation;
import org.apache.dubbo.rpc.RpcResult;
import org.apache.dubbo.rpc.cluster.Directory;
import org.apache.dubbo.rpc.cluster.filter.DemoService;

Expand All @@ -48,7 +48,7 @@ public class FailSafeClusterInvokerTest {
Invoker<DemoService> invoker = mock(Invoker.class);
RpcInvocation invocation = new RpcInvocation();
Directory<DemoService> dic;
Result result = new RpcResult();
Result result = new AppResponse();

/**
* @throws java.lang.Exception
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.utils.DubboAppender;
import org.apache.dubbo.common.utils.LogUtil;
import org.apache.dubbo.rpc.AppResponse;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.Result;
import org.apache.dubbo.rpc.RpcContext;
import org.apache.dubbo.rpc.RpcInvocation;
import org.apache.dubbo.rpc.RpcResult;
import org.apache.dubbo.rpc.cluster.Directory;

import org.apache.log4j.Level;
Expand Down Expand Up @@ -59,7 +59,7 @@ public class FailbackClusterInvokerTest {
Invoker<FailbackClusterInvokerTest> invoker = mock(Invoker.class);
RpcInvocation invocation = new RpcInvocation();
Directory<FailbackClusterInvokerTest> dic;
Result result = new RpcResult();
Result result = new AppResponse();

/**
* @throws java.lang.Exception
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@
package org.apache.dubbo.rpc.cluster.support;

import org.apache.dubbo.common.URL;
import org.apache.dubbo.rpc.AppResponse;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.Result;
import org.apache.dubbo.rpc.RpcContext;
import org.apache.dubbo.rpc.RpcException;
import org.apache.dubbo.rpc.RpcInvocation;
import org.apache.dubbo.rpc.RpcResult;
import org.apache.dubbo.rpc.cluster.Directory;

import org.junit.jupiter.api.Assertions;
Expand All @@ -47,7 +47,7 @@ public class FailfastClusterInvokerTest {
Invoker<FailfastClusterInvokerTest> invoker1 = mock(Invoker.class);
RpcInvocation invocation = new RpcInvocation();
Directory<FailfastClusterInvokerTest> dic;
Result result = new RpcResult();
Result result = new AppResponse();

/**
* @throws java.lang.Exception
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@
package org.apache.dubbo.rpc.cluster.support;

import org.apache.dubbo.common.URL;
import org.apache.dubbo.rpc.AppResponse;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.Result;
import org.apache.dubbo.rpc.RpcException;
import org.apache.dubbo.rpc.RpcInvocation;
import org.apache.dubbo.rpc.RpcResult;
import org.apache.dubbo.rpc.cluster.Directory;
import org.apache.dubbo.rpc.cluster.directory.StaticDirectory;
import org.apache.dubbo.rpc.protocol.AbstractInvoker;
Expand Down Expand Up @@ -55,7 +55,7 @@ public class FailoverClusterInvokerTest {
private Invoker<FailoverClusterInvokerTest> invoker2 = mock(Invoker.class);
private RpcInvocation invocation = new RpcInvocation();
private Directory<FailoverClusterInvokerTest> dic;
private Result result = new RpcResult();
private Result result = new AppResponse();

/**
* @throws java.lang.Exception
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@
package org.apache.dubbo.rpc.cluster.support;

import org.apache.dubbo.common.URL;
import org.apache.dubbo.rpc.AppResponse;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.Result;
import org.apache.dubbo.rpc.RpcContext;
import org.apache.dubbo.rpc.RpcException;
import org.apache.dubbo.rpc.RpcInvocation;
import org.apache.dubbo.rpc.RpcResult;
import org.apache.dubbo.rpc.RpcContext;
import org.apache.dubbo.rpc.cluster.Directory;

import org.junit.jupiter.api.Assertions;
Expand Down Expand Up @@ -50,7 +50,7 @@ public class ForkingClusterInvokerTest {
private Invoker<ForkingClusterInvokerTest> invoker3 = mock(Invoker.class);
private RpcInvocation invocation = new RpcInvocation();
private Directory<ForkingClusterInvokerTest> dic;
private Result result = new RpcResult();
private Result result = new AppResponse();

@BeforeEach
public void setUp() throws Exception {
Expand Down
Loading

0 comments on commit ebafc18

Please sign in to comment.