Skip to content

Commit

Permalink
Enhance consumer side thread model: threadless executor (#5490)
Browse files Browse the repository at this point in the history
  • Loading branch information
chickenlj authored Dec 17, 2019
1 parent 2511a85 commit 5f8ac2b
Show file tree
Hide file tree
Showing 5 changed files with 71 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.AbstractExecutorService;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
Expand All @@ -42,6 +44,8 @@ public class ThreadlessExecutor extends AbstractExecutorService {

private ExecutorService sharedExecutor;

private CompletableFuture<?> waitingFuture;

private volatile boolean waiting = true;

private final Object lock = new Object();
Expand All @@ -50,6 +54,14 @@ public ThreadlessExecutor(ExecutorService sharedExecutor) {
this.sharedExecutor = sharedExecutor;
}

public CompletableFuture<?> getWaitingFuture() {
return waitingFuture;
}

public void setWaitingFuture(CompletableFuture<?> waitingFuture) {
this.waitingFuture = waitingFuture;
}

public boolean isWaiting() {
return waiting;
}
Expand Down Expand Up @@ -113,9 +125,10 @@ public void execute(Runnable runnable) {
/**
* tells the thread blocking on {@link #waitAndDrain()} to return, despite of the current status, to avoid endless waiting.
*/
public void notifyReturn() {
public void notifyReturn(Throwable t) {
// an empty runnable task.
execute(() -> {
waitingFuture.completeExceptionally(t);
});
}

Expand All @@ -125,12 +138,14 @@ public void notifyReturn() {

@Override
public void shutdown() {

shutdownNow();
}

@Override
public List<Runnable> shutdownNow() {
return null;
notifyReturn(new IllegalStateException("Consumer is shutting down and this call is going to be stopped without " +
"receiving any result, usually this is called by a slow provider instance or bad service implementation."));
return Collections.emptyList();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,12 @@ public DefaultExecutorRepository() {
serviceExporterExecutor = Executors.newScheduledThreadPool(1, new NamedThreadFactory("Dubbo-exporter-scheduler"));
}

/**
* Get called when the server or client instance initiating.
*
* @param url
* @return
*/
public synchronized ExecutorService createExecutorIfAbsent(URL url) {
String componentKey = EXECUTOR_SERVICE_COMPONENT_KEY;
if (CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(SIDE_KEY))) {
Expand All @@ -87,7 +93,14 @@ public ExecutorService getExecutor(URL url) {
componentKey = CONSUMER_SIDE;
}
Map<Integer, ExecutorService> executors = data.get(componentKey);

/**
* It's guaranteed that this method is called after {@link #createExecutorIfAbsent(URL)}, so data should already
* have Executor instances generated and stored.
*/
if (executors == null) {
logger.warn("No available executors, this is not expected, framework should call createExecutorIfAbsent first " +
"before coming to here.");
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,10 @@ private static void timeoutCheck(DefaultFuture future) {
public static DefaultFuture newFuture(Channel channel, Request request, int timeout, ExecutorService executor) {
final DefaultFuture future = new DefaultFuture(channel, request, timeout);
future.setExecutor(executor);
// ThreadlessExecutor needs to hold the waiting future in case of circuit return.
if (executor instanceof ThreadlessExecutor) {
((ThreadlessExecutor) executor).setWaitingFuture(future);
}
// timeout check
timeoutCheck(future);
return future;
Expand Down Expand Up @@ -138,6 +142,11 @@ public static void closeChannel(Channel channel) {
if (channel.equals(entry.getValue())) {
DefaultFuture future = getFuture(entry.getKey());
if (future != null && !future.isDone()) {
ExecutorService futureExecutor = future.getExecutor();
if (futureExecutor != null && !futureExecutor.isTerminated()) {
futureExecutor.shutdownNow();
}

Response disconnectResponse = new Response(future.getId());
disconnectResponse.setStatus(Response.CHANNEL_INACTIVE);
disconnectResponse.setErrorMessage("Channel " +
Expand Down Expand Up @@ -208,7 +217,8 @@ private void doReceived(Response res) {
if (executor != null && executor instanceof ThreadlessExecutor) {
ThreadlessExecutor threadlessExecutor = (ThreadlessExecutor) executor;
if (threadlessExecutor.isWaiting()) {
threadlessExecutor.notifyReturn();
threadlessExecutor.notifyReturn(new IllegalStateException("The result has returned, but the biz thread is still waiting" +
" which is not an expected state, interrupt the thread manually by returning an exception."));
}
}
}
Expand Down Expand Up @@ -271,16 +281,20 @@ public void run(Timeout timeout) {
return;
}
if (future.getExecutor() != null) {
future.getExecutor().execute(() -> {
// create exception response.
Response timeoutResponse = new Response(future.getId());
// set timeout status.
timeoutResponse.setStatus(future.isSent() ? Response.SERVER_TIMEOUT : Response.CLIENT_TIMEOUT);
timeoutResponse.setErrorMessage(future.getTimeoutMessage(true));
// handle response.
DefaultFuture.received(future.getChannel(), timeoutResponse, true);
});
future.getExecutor().execute(() -> notifyTimeout(future));
} else {
notifyTimeout(future);
}
}

private void notifyTimeout(DefaultFuture future) {
// create exception response.
Response timeoutResponse = new Response(future.getId());
// set timeout status.
timeoutResponse.setStatus(future.isSent() ? Response.SERVER_TIMEOUT : Response.CLIENT_TIMEOUT);
timeoutResponse.setErrorMessage(future.getTimeoutMessage(true));
// handle response.
DefaultFuture.received(future.getChannel(), timeoutResponse, true);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,9 @@ public void setValue(Object value) {
responseFuture.complete(appResponse);
}
} catch (Exception e) {
// This should never happen;
logger.error("Got exception when trying to change the value of the underlying result from AsyncRpcResult.", e);
// This should not happen in normal request process;
logger.error("Got exception when trying to fetch the underlying result from AsyncRpcResult.");
throw new RpcException(e);
}
}

Expand All @@ -117,8 +118,9 @@ public void setException(Throwable t) {
responseFuture.complete(appResponse);
}
} catch (Exception e) {
// This should never happen;
logger.error("Got exception when trying to change the value of the underlying result from AsyncRpcResult.", e);
// This should not happen in normal request process;
logger.error("Got exception when trying to fetch the underlying result from AsyncRpcResult.");
throw new RpcException(e);
}
}

Expand All @@ -141,8 +143,9 @@ public Result getAppResponse() {
return responseFuture.get();
}
} catch (Exception e) {
// This should never happen;
logger.error("Got exception when trying to fetch the underlying result from AsyncRpcResult.", e);
// This should not happen in normal request process;
logger.error("Got exception when trying to fetch the underlying result from AsyncRpcResult.");
throw new RpcException(e);
}
return new AppResponse();
}
Expand All @@ -158,7 +161,7 @@ public Result getAppResponse() {
*/
@Override
public Result get() throws InterruptedException, ExecutionException {
if (executor != null) {
if (executor != null && executor instanceof ThreadlessExecutor) {
ThreadlessExecutor threadlessExecutor = (ThreadlessExecutor) executor;
threadlessExecutor.waitAndDrain();
}
Expand All @@ -167,7 +170,7 @@ public Result get() throws InterruptedException, ExecutionException {

@Override
public Result get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
if (executor != null) {
if (executor != null && executor instanceof ThreadlessExecutor) {
ThreadlessExecutor threadlessExecutor = (ThreadlessExecutor) executor;
threadlessExecutor.waitAndDrain();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@ public Result invoke(Invocation invocation) throws RpcException {

try {
if (InvokeMode.SYNC == ((RpcInvocation) invocation).getInvokeMode()) {
/**
* NOTICE!
* must call {@link java.util.concurrent.CompletableFuture#get(long, TimeUnit)} because
* {@link java.util.concurrent.CompletableFuture#get()} was proved to have serious performance drop.
*/
asyncResult.get(Integer.MAX_VALUE, TimeUnit.MILLISECONDS);
}
} catch (InterruptedException e) {
Expand Down

0 comments on commit 5f8ac2b

Please sign in to comment.