Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

merge async changes in 3.x to 2.7 #3997

Merged
merged 31 commits into from
May 23, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
1f52668
Async optimization (#3738)
chickenlj Apr 12, 2019
596b5d5
resolve conflicts
chickenlj May 8, 2019
8143d4e
Rename some variables to avoid possible confusion between Result and …
chickenlj Apr 19, 2019
64081c6
Result implements future and keep Filter backward compatibility. (#3916)
chickenlj Apr 24, 2019
670690e
get CompletableFuture before thenApply
chickenlj Apr 24, 2019
0d0c991
rename property to avoid possible confusion
chickenlj Apr 24, 2019
db962a1
add comment
chickenlj Apr 24, 2019
5971410
add comment
chickenlj Apr 24, 2019
d9676b8
add comment
ralf0131 May 20, 2018
5c90c8f
add comment
chickenlj Apr 26, 2019
76862a6
Merge branch 'master' into consumer-thread-isolation
chickenlj May 8, 2019
93688fd
resolve conflicts when merging async from 3.x
chickenlj May 8, 2019
82ffe21
keep dubbo-demo unchanged
chickenlj May 8, 2019
2e9e367
delete PropertiesConfigurationTest, does not exit in master branch be…
chickenlj May 13, 2019
e02d099
Merge branch 'master' into consumer-thread-isolation
chickenlj May 13, 2019
de320ba
merge master to resolve conflicts
chickenlj May 14, 2019
7381887
Merge branch 'master' into consumer-thread-isolation
chickenlj May 14, 2019
09f2047
merge master to resolve conflicts
chickenlj May 14, 2019
3194ef7
remove unused imports
chickenlj May 14, 2019
2634859
remove unused imports
chickenlj May 14, 2019
2869f6d
remove unused imports
chickenlj May 15, 2019
5738a75
Merge branch 'master' into consumer-thread-isolation
chickenlj May 16, 2019
b57d818
remove unused imports
chickenlj May 16, 2019
32a3d07
remove unused imports
chickenlj May 16, 2019
2b8ce01
Fix but, replace AppResponse with AsyncRpcResult
chickenlj May 16, 2019
56953eb
refactor doRefer in XmlRpcProtocol extension to getFrameProxy
chickenlj May 16, 2019
00aa992
reuse doRefer to reduce backward influences
chickenlj May 16, 2019
3615281
Merge branch 'master' into consumer-thread-isolation
chickenlj May 17, 2019
fc797e2
remove unused imports after merge master
chickenlj May 17, 2019
fa1c373
Fix issues find during review.
chickenlj May 22, 2019
b155d93
add UT
chickenlj May 22, 2019
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@
import org.apache.dubbo.common.timer.Timer;
import org.apache.dubbo.common.timer.TimerTask;
import org.apache.dubbo.common.utils.NamedThreadFactory;
import org.apache.dubbo.rpc.AsyncRpcResult;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.Result;
import org.apache.dubbo.rpc.RpcException;
import org.apache.dubbo.rpc.RpcResult;
import org.apache.dubbo.rpc.cluster.Directory;
import org.apache.dubbo.rpc.cluster.LoadBalance;

Expand Down Expand Up @@ -103,7 +103,7 @@ protected Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, Load
logger.error("Failback to invoke method " + invocation.getMethodName() + ", wait for retry in background. Ignored exception: "
+ e.getMessage() + ", ", e);
addFailed(loadbalance, invocation, invokers, invoker);
return new RpcResult(); // ignore
return AsyncRpcResult.newDefaultAsyncResult(null, null, invocation); // ignore
chickenlj marked this conversation as resolved.
Show resolved Hide resolved
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,18 @@

import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.rpc.AsyncRpcResult;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.Result;
import org.apache.dubbo.rpc.RpcException;
import org.apache.dubbo.rpc.RpcResult;
import org.apache.dubbo.rpc.cluster.Directory;
import org.apache.dubbo.rpc.cluster.LoadBalance;

import java.util.List;

/**
* When invoke fails, log the error message and ignore this error by returning an empty RpcResult.
* When invoke fails, log the error message and ignore this error by returning an empty Result.
* Usually used to write audit logs and other operations
*
* <a href="http://en.wikipedia.org/wiki/Fail-safe">Fail-safe</a>
Expand All @@ -50,7 +50,7 @@ public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBal
return invoker.invoke(invocation);
} catch (Throwable e) {
logger.error("Failsafe ignore exception: " + e.getMessage(), e);
return new RpcResult(); // ignore
return AsyncRpcResult.newDefaultAsyncResult(null, null, invocation); // ignore
chickenlj marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@
import static org.apache.dubbo.common.constants.CommonConstants.TIMEOUT_KEY;

/**
* NOTICE! This implementation does not work well with async call.
*
* Invoke a specific number of invokers concurrently, usually used for demanding real-time operations, but need to waste more service resources.
*
* <a href="http://en.wikipedia.org/wiki/Fork_(topology)">Fork</a>
Expand Down Expand Up @@ -70,7 +72,6 @@ public Result doInvoke(final Invocation invocation, List<Invoker<T>> invokers, L
} else {
selected = new ArrayList<>();
for (int i = 0; i < forks; i++) {
// TODO. Add some comment here, refer chinese version for more details.
Invoker<T> invoker = select(loadbalance, invocation, invokers, selected);
if (!selected.contains(invoker)) {
//Avoid add the same invoker several times.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.utils.ConfigUtils;
import org.apache.dubbo.common.utils.NamedThreadFactory;
import org.apache.dubbo.rpc.AsyncRpcResult;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.Result;
import org.apache.dubbo.rpc.RpcException;
import org.apache.dubbo.rpc.RpcInvocation;
import org.apache.dubbo.rpc.RpcResult;
import org.apache.dubbo.rpc.cluster.Directory;
import org.apache.dubbo.rpc.cluster.LoadBalance;
import org.apache.dubbo.rpc.cluster.Merger;
Expand All @@ -40,17 +40,16 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

import static org.apache.dubbo.common.constants.CommonConstants.DEFAULT_TIMEOUT;
import static org.apache.dubbo.common.constants.CommonConstants.GROUP_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.TIMEOUT_KEY;
import static org.apache.dubbo.rpc.Constants.ASYNC_KEY;
import static org.apache.dubbo.rpc.Constants.MERGER_KEY;

/**
* @param <T>
*/
@SuppressWarnings("unchecked")
public class MergeableClusterInvoker<T> extends AbstractClusterInvoker<T> {

Expand Down Expand Up @@ -90,26 +89,21 @@ protected Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, Load
returnType = null;
}

Map<String, Future<Result>> results = new HashMap<String, Future<Result>>();
Map<String, Result> results = new HashMap<>();
for (final Invoker<T> invoker : invokers) {
Future<Result> future = executor.submit(new Callable<Result>() {
@Override
public Result call() throws Exception {
return invoker.invoke(new RpcInvocation(invocation, invoker));
}
});
results.put(invoker.getUrl().getServiceKey(), future);
RpcInvocation subInvocation = new RpcInvocation(invocation, invoker);
subInvocation.setAttachment(ASYNC_KEY, "true");
results.put(invoker.getUrl().getServiceKey(), invoker.invoke(subInvocation));
}

Object result = null;

List<Result> resultList = new ArrayList<Result>(results.size());

int timeout = getUrl().getMethodParameter(invocation.getMethodName(), TIMEOUT_KEY, DEFAULT_TIMEOUT);
for (Map.Entry<String, Future<Result>> entry : results.entrySet()) {
Future<Result> future = entry.getValue();
for (Map.Entry<String, Result> entry : results.entrySet()) {
Result asyncResult = entry.getValue();
try {
Result r = future.get(timeout, TimeUnit.MILLISECONDS);
Result r = asyncResult.get();
chickenlj marked this conversation as resolved.
Show resolved Hide resolved
if (r.hasException()) {
log.error("Invoke " + getGroupDescFromServiceKey(entry.getKey()) +
" failed: " + r.getException().getMessage(),
Expand All @@ -123,13 +117,13 @@ public Result call() throws Exception {
}

if (resultList.isEmpty()) {
return new RpcResult((Object) null);
return AsyncRpcResult.newDefaultAsyncResult(invocation);
} else if (resultList.size() == 1) {
return resultList.iterator().next();
}

if (returnType == void.class) {
return new RpcResult((Object) null);
return AsyncRpcResult.newDefaultAsyncResult(invocation);
}

if (merger.startsWith(".")) {
Expand Down Expand Up @@ -177,7 +171,7 @@ public Result call() throws Exception {
throw new RpcException("There is no merger to merge result.");
}
}
return new RpcResult(result);
return AsyncRpcResult.newDefaultAsyncResult(result, invocation);
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.utils.CollectionUtils;
import org.apache.dubbo.common.utils.StringUtils;
import org.apache.dubbo.rpc.AsyncRpcResult;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.Result;
import org.apache.dubbo.rpc.RpcException;
import org.apache.dubbo.rpc.RpcInvocation;
import org.apache.dubbo.rpc.RpcResult;
import org.apache.dubbo.rpc.cluster.Directory;
import org.apache.dubbo.rpc.support.MockInvoker;

Expand Down Expand Up @@ -115,7 +115,7 @@ private Result doMockInvoke(Invocation invocation, RpcException e) {
result = minvoker.invoke(invocation);
} catch (RpcException me) {
if (me.isBiz()) {
result = new RpcResult(me.getCause());
result = AsyncRpcResult.newDefaultAsyncResult(me.getCause(), invocation);
} else {
throw new RpcException(me.getCode(), getMockExceptionMessage(e, me), me.getCause());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@

import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.extension.ExtensionLoader;
import org.apache.dubbo.rpc.AppResponse;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.Result;
import org.apache.dubbo.rpc.RpcException;
import org.apache.dubbo.rpc.RpcInvocation;
import org.apache.dubbo.rpc.RpcResult;
import org.apache.dubbo.rpc.cluster.support.AbstractClusterInvoker;

import org.junit.jupiter.api.Assertions;
Expand All @@ -48,7 +48,7 @@ public class StickyTest {
private Invoker<StickyTest> invoker2 = mock(Invoker.class);
private RpcInvocation invocation;
private Directory<StickyTest> dic;
private Result result = new RpcResult();
private Result result = new AppResponse();
private StickyClusterInvoker<StickyTest> clusterinvoker = null;
private URL url = URL.valueOf("test://test:11/test?"
+ "&loadbalance=roundrobin"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,16 @@ public Map<String, String> getAttachments() {
return attachments;
}

@Override
public void setAttachment(String key, String value) {

}

@Override
public void setAttachmentIfAbsent(String key, String value) {

}

public Invoker<?> getInvoker() {
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@

import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.extension.ExtensionLoader;
import org.apache.dubbo.rpc.AppResponse;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.Result;
import org.apache.dubbo.rpc.RpcException;
import org.apache.dubbo.rpc.RpcInvocation;
import org.apache.dubbo.rpc.RpcResult;
import org.apache.dubbo.rpc.cluster.Directory;
import org.apache.dubbo.rpc.cluster.LoadBalance;
import org.apache.dubbo.rpc.cluster.RouterFactory;
Expand Down Expand Up @@ -52,7 +52,7 @@ public class FileRouterEngineTest {
Invoker<FileRouterEngineTest> invoker2 = mock(Invoker.class);
Invocation invocation;
StaticDirectory<FileRouterEngineTest> dic;
Result result = new RpcResult();
Result result = new AppResponse();
private RouterFactory routerFactory = ExtensionLoader.getExtensionLoader(RouterFactory.class).getAdaptiveExtension();

@BeforeAll
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@

import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.utils.LogUtil;
import org.apache.dubbo.rpc.AppResponse;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.Result;
import org.apache.dubbo.rpc.RpcContext;
import org.apache.dubbo.rpc.RpcInvocation;
import org.apache.dubbo.rpc.RpcResult;
import org.apache.dubbo.rpc.cluster.Directory;
import org.apache.dubbo.rpc.cluster.filter.DemoService;

Expand All @@ -48,7 +48,7 @@ public class FailSafeClusterInvokerTest {
Invoker<DemoService> invoker = mock(Invoker.class);
RpcInvocation invocation = new RpcInvocation();
Directory<DemoService> dic;
Result result = new RpcResult();
Result result = new AppResponse();

/**
* @throws java.lang.Exception
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.utils.DubboAppender;
import org.apache.dubbo.common.utils.LogUtil;
import org.apache.dubbo.rpc.AppResponse;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.Result;
import org.apache.dubbo.rpc.RpcContext;
import org.apache.dubbo.rpc.RpcInvocation;
import org.apache.dubbo.rpc.RpcResult;
import org.apache.dubbo.rpc.cluster.Directory;

import org.apache.log4j.Level;
Expand Down Expand Up @@ -59,7 +59,7 @@ public class FailbackClusterInvokerTest {
Invoker<FailbackClusterInvokerTest> invoker = mock(Invoker.class);
RpcInvocation invocation = new RpcInvocation();
Directory<FailbackClusterInvokerTest> dic;
Result result = new RpcResult();
Result result = new AppResponse();

/**
* @throws java.lang.Exception
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@
package org.apache.dubbo.rpc.cluster.support;

import org.apache.dubbo.common.URL;
import org.apache.dubbo.rpc.AppResponse;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.Result;
import org.apache.dubbo.rpc.RpcContext;
import org.apache.dubbo.rpc.RpcException;
import org.apache.dubbo.rpc.RpcInvocation;
import org.apache.dubbo.rpc.RpcResult;
import org.apache.dubbo.rpc.cluster.Directory;

import org.junit.jupiter.api.Assertions;
Expand All @@ -47,7 +47,7 @@ public class FailfastClusterInvokerTest {
Invoker<FailfastClusterInvokerTest> invoker1 = mock(Invoker.class);
RpcInvocation invocation = new RpcInvocation();
Directory<FailfastClusterInvokerTest> dic;
Result result = new RpcResult();
Result result = new AppResponse();

/**
* @throws java.lang.Exception
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@
package org.apache.dubbo.rpc.cluster.support;

import org.apache.dubbo.common.URL;
import org.apache.dubbo.rpc.AppResponse;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.Result;
import org.apache.dubbo.rpc.RpcException;
import org.apache.dubbo.rpc.RpcInvocation;
import org.apache.dubbo.rpc.RpcResult;
import org.apache.dubbo.rpc.cluster.Directory;
import org.apache.dubbo.rpc.cluster.directory.StaticDirectory;
import org.apache.dubbo.rpc.protocol.AbstractInvoker;
Expand Down Expand Up @@ -55,7 +55,7 @@ public class FailoverClusterInvokerTest {
private Invoker<FailoverClusterInvokerTest> invoker2 = mock(Invoker.class);
private RpcInvocation invocation = new RpcInvocation();
private Directory<FailoverClusterInvokerTest> dic;
private Result result = new RpcResult();
private Result result = new AppResponse();

/**
* @throws java.lang.Exception
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@
package org.apache.dubbo.rpc.cluster.support;

import org.apache.dubbo.common.URL;
import org.apache.dubbo.rpc.AppResponse;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.Result;
import org.apache.dubbo.rpc.RpcContext;
import org.apache.dubbo.rpc.RpcException;
import org.apache.dubbo.rpc.RpcInvocation;
import org.apache.dubbo.rpc.RpcResult;
import org.apache.dubbo.rpc.RpcContext;
import org.apache.dubbo.rpc.cluster.Directory;

import org.junit.jupiter.api.Assertions;
Expand Down Expand Up @@ -50,7 +50,7 @@ public class ForkingClusterInvokerTest {
private Invoker<ForkingClusterInvokerTest> invoker3 = mock(Invoker.class);
private RpcInvocation invocation = new RpcInvocation();
private Directory<ForkingClusterInvokerTest> dic;
private Result result = new RpcResult();
private Result result = new AppResponse();

@BeforeEach
public void setUp() throws Exception {
Expand Down
Loading