diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/ThreadlessExecutor.java b/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/ThreadlessExecutor.java index 322d8d97736..a0fc0843ac1 100644 --- a/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/ThreadlessExecutor.java +++ b/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/ThreadlessExecutor.java @@ -19,9 +19,11 @@ import org.apache.dubbo.common.logger.Logger; import org.apache.dubbo.common.logger.LoggerFactory; +import java.util.Collections; import java.util.List; import java.util.concurrent.AbstractExecutorService; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; @@ -42,6 +44,8 @@ public class ThreadlessExecutor extends AbstractExecutorService { private ExecutorService sharedExecutor; + private CompletableFuture waitingFuture; + private volatile boolean waiting = true; private final Object lock = new Object(); @@ -50,6 +54,14 @@ public ThreadlessExecutor(ExecutorService sharedExecutor) { this.sharedExecutor = sharedExecutor; } + public CompletableFuture getWaitingFuture() { + return waitingFuture; + } + + public void setWaitingFuture(CompletableFuture waitingFuture) { + this.waitingFuture = waitingFuture; + } + public boolean isWaiting() { return waiting; } @@ -113,9 +125,10 @@ public void execute(Runnable runnable) { /** * tells the thread blocking on {@link #waitAndDrain()} to return, despite of the current status, to avoid endless waiting. */ - public void notifyReturn() { + public void notifyReturn(Throwable t) { // an empty runnable task. execute(() -> { + waitingFuture.completeExceptionally(t); }); } @@ -125,12 +138,14 @@ public void notifyReturn() { @Override public void shutdown() { - + shutdownNow(); } @Override public List shutdownNow() { - return null; + notifyReturn(new IllegalStateException("Consumer is shutting down and this call is going to be stopped without " + + "receiving any result, usually this is called by a slow provider instance or bad service implementation.")); + return Collections.emptyList(); } @Override diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/manager/DefaultExecutorRepository.java b/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/manager/DefaultExecutorRepository.java index f0aab3c91a9..dd37bffa19f 100644 --- a/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/manager/DefaultExecutorRepository.java +++ b/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/manager/DefaultExecutorRepository.java @@ -64,6 +64,12 @@ public DefaultExecutorRepository() { serviceExporterExecutor = Executors.newScheduledThreadPool(1, new NamedThreadFactory("Dubbo-exporter-scheduler")); } + /** + * Get called when the server or client instance initiating. + * + * @param url + * @return + */ public synchronized ExecutorService createExecutorIfAbsent(URL url) { String componentKey = EXECUTOR_SERVICE_COMPONENT_KEY; if (CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(SIDE_KEY))) { @@ -87,7 +93,14 @@ public ExecutorService getExecutor(URL url) { componentKey = CONSUMER_SIDE; } Map executors = data.get(componentKey); + + /** + * It's guaranteed that this method is called after {@link #createExecutorIfAbsent(URL)}, so data should already + * have Executor instances generated and stored. + */ if (executors == null) { + logger.warn("No available executors, this is not expected, framework should call createExecutorIfAbsent first " + + "before coming to here."); return null; } diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/DefaultFuture.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/DefaultFuture.java index aeaff45da9c..2fe1eef6ceb 100644 --- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/DefaultFuture.java +++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/DefaultFuture.java @@ -107,6 +107,10 @@ private static void timeoutCheck(DefaultFuture future) { public static DefaultFuture newFuture(Channel channel, Request request, int timeout, ExecutorService executor) { final DefaultFuture future = new DefaultFuture(channel, request, timeout); future.setExecutor(executor); + // ThreadlessExecutor needs to hold the waiting future in case of circuit return. + if (executor instanceof ThreadlessExecutor) { + ((ThreadlessExecutor) executor).setWaitingFuture(future); + } // timeout check timeoutCheck(future); return future; @@ -138,6 +142,11 @@ public static void closeChannel(Channel channel) { if (channel.equals(entry.getValue())) { DefaultFuture future = getFuture(entry.getKey()); if (future != null && !future.isDone()) { + ExecutorService futureExecutor = future.getExecutor(); + if (futureExecutor != null && !futureExecutor.isTerminated()) { + futureExecutor.shutdownNow(); + } + Response disconnectResponse = new Response(future.getId()); disconnectResponse.setStatus(Response.CHANNEL_INACTIVE); disconnectResponse.setErrorMessage("Channel " + @@ -208,7 +217,8 @@ private void doReceived(Response res) { if (executor != null && executor instanceof ThreadlessExecutor) { ThreadlessExecutor threadlessExecutor = (ThreadlessExecutor) executor; if (threadlessExecutor.isWaiting()) { - threadlessExecutor.notifyReturn(); + threadlessExecutor.notifyReturn(new IllegalStateException("The result has returned, but the biz thread is still waiting" + + " which is not an expected state, interrupt the thread manually by returning an exception.")); } } } @@ -271,16 +281,20 @@ public void run(Timeout timeout) { return; } if (future.getExecutor() != null) { - future.getExecutor().execute(() -> { - // create exception response. - Response timeoutResponse = new Response(future.getId()); - // set timeout status. - timeoutResponse.setStatus(future.isSent() ? Response.SERVER_TIMEOUT : Response.CLIENT_TIMEOUT); - timeoutResponse.setErrorMessage(future.getTimeoutMessage(true)); - // handle response. - DefaultFuture.received(future.getChannel(), timeoutResponse, true); - }); + future.getExecutor().execute(() -> notifyTimeout(future)); + } else { + notifyTimeout(future); } } + + private void notifyTimeout(DefaultFuture future) { + // create exception response. + Response timeoutResponse = new Response(future.getId()); + // set timeout status. + timeoutResponse.setStatus(future.isSent() ? Response.SERVER_TIMEOUT : Response.CLIENT_TIMEOUT); + timeoutResponse.setErrorMessage(future.getTimeoutMessage(true)); + // handle response. + DefaultFuture.received(future.getChannel(), timeoutResponse, true); + } } } diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncRpcResult.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncRpcResult.java index cb73c039483..5a6e169419b 100644 --- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncRpcResult.java +++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncRpcResult.java @@ -96,8 +96,9 @@ public void setValue(Object value) { responseFuture.complete(appResponse); } } catch (Exception e) { - // This should never happen; - logger.error("Got exception when trying to change the value of the underlying result from AsyncRpcResult.", e); + // This should not happen in normal request process; + logger.error("Got exception when trying to fetch the underlying result from AsyncRpcResult."); + throw new RpcException(e); } } @@ -117,8 +118,9 @@ public void setException(Throwable t) { responseFuture.complete(appResponse); } } catch (Exception e) { - // This should never happen; - logger.error("Got exception when trying to change the value of the underlying result from AsyncRpcResult.", e); + // This should not happen in normal request process; + logger.error("Got exception when trying to fetch the underlying result from AsyncRpcResult."); + throw new RpcException(e); } } @@ -141,8 +143,9 @@ public Result getAppResponse() { return responseFuture.get(); } } catch (Exception e) { - // This should never happen; - logger.error("Got exception when trying to fetch the underlying result from AsyncRpcResult.", e); + // This should not happen in normal request process; + logger.error("Got exception when trying to fetch the underlying result from AsyncRpcResult."); + throw new RpcException(e); } return new AppResponse(); } @@ -158,7 +161,7 @@ public Result getAppResponse() { */ @Override public Result get() throws InterruptedException, ExecutionException { - if (executor != null) { + if (executor != null && executor instanceof ThreadlessExecutor) { ThreadlessExecutor threadlessExecutor = (ThreadlessExecutor) executor; threadlessExecutor.waitAndDrain(); } @@ -167,7 +170,7 @@ public Result get() throws InterruptedException, ExecutionException { @Override public Result get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { - if (executor != null) { + if (executor != null && executor instanceof ThreadlessExecutor) { ThreadlessExecutor threadlessExecutor = (ThreadlessExecutor) executor; threadlessExecutor.waitAndDrain(); } diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/AsyncToSyncInvoker.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/AsyncToSyncInvoker.java index 0b285329b38..bc381d2995b 100644 --- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/AsyncToSyncInvoker.java +++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/AsyncToSyncInvoker.java @@ -53,6 +53,11 @@ public Result invoke(Invocation invocation) throws RpcException { try { if (InvokeMode.SYNC == ((RpcInvocation) invocation).getInvokeMode()) { + /** + * NOTICE! + * must call {@link java.util.concurrent.CompletableFuture#get(long, TimeUnit)} because + * {@link java.util.concurrent.CompletableFuture#get()} was proved to have serious performance drop. + */ asyncResult.get(Integer.MAX_VALUE, TimeUnit.MILLISECONDS); } } catch (InterruptedException e) {