Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhance consumer side thread model: threadless executor #5490

Merged
merged 2 commits into from
Dec 17, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.AbstractExecutorService;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
Expand All @@ -42,6 +44,8 @@ public class ThreadlessExecutor extends AbstractExecutorService {

private ExecutorService sharedExecutor;

private CompletableFuture<?> waitingFuture;

private volatile boolean waiting = true;

private final Object lock = new Object();
Expand All @@ -50,6 +54,14 @@ public ThreadlessExecutor(ExecutorService sharedExecutor) {
this.sharedExecutor = sharedExecutor;
}

public CompletableFuture<?> getWaitingFuture() {
return waitingFuture;
}

public void setWaitingFuture(CompletableFuture<?> waitingFuture) {
this.waitingFuture = waitingFuture;
}

public boolean isWaiting() {
return waiting;
}
Expand Down Expand Up @@ -113,9 +125,10 @@ public void execute(Runnable runnable) {
/**
* tells the thread blocking on {@link #waitAndDrain()} to return, despite of the current status, to avoid endless waiting.
*/
public void notifyReturn() {
public void notifyReturn(Throwable t) {
// an empty runnable task.
execute(() -> {
waitingFuture.completeExceptionally(t);
});
}

Expand All @@ -125,12 +138,14 @@ public void notifyReturn() {

@Override
public void shutdown() {

shutdownNow();
}

@Override
public List<Runnable> shutdownNow() {
return null;
notifyReturn(new IllegalStateException("Consumer is shutting down and this call is going to be stopped without " +
"receiving any result, usually this is called by a slow provider instance or bad service implementation."));
return Collections.emptyList();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,12 @@ public DefaultExecutorRepository() {
serviceExporterExecutor = Executors.newScheduledThreadPool(1, new NamedThreadFactory("Dubbo-exporter-scheduler"));
}

/**
* Get called when the server or client instance initiating.
*
* @param url
* @return
*/
public synchronized ExecutorService createExecutorIfAbsent(URL url) {
String componentKey = EXECUTOR_SERVICE_COMPONENT_KEY;
if (CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(SIDE_KEY))) {
Expand All @@ -87,7 +93,14 @@ public ExecutorService getExecutor(URL url) {
componentKey = CONSUMER_SIDE;
}
Map<Integer, ExecutorService> executors = data.get(componentKey);

/**
* It's guaranteed that this method is called after {@link #createExecutorIfAbsent(URL)}, so data should already
* have Executor instances generated and stored.
*/
if (executors == null) {
logger.warn("No available executors, this is not expected, framework should call createExecutorIfAbsent first " +
"before coming to here.");
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,10 @@ private static void timeoutCheck(DefaultFuture future) {
public static DefaultFuture newFuture(Channel channel, Request request, int timeout, ExecutorService executor) {
final DefaultFuture future = new DefaultFuture(channel, request, timeout);
future.setExecutor(executor);
// ThreadlessExecutor needs to hold the waiting future in case of circuit return.
if (executor instanceof ThreadlessExecutor) {
((ThreadlessExecutor) executor).setWaitingFuture(future);
}
// timeout check
timeoutCheck(future);
return future;
Expand Down Expand Up @@ -138,6 +142,11 @@ public static void closeChannel(Channel channel) {
if (channel.equals(entry.getValue())) {
DefaultFuture future = getFuture(entry.getKey());
if (future != null && !future.isDone()) {
ExecutorService futureExecutor = future.getExecutor();
if (futureExecutor != null && !futureExecutor.isTerminated()) {
futureExecutor.shutdownNow();
}

Response disconnectResponse = new Response(future.getId());
disconnectResponse.setStatus(Response.CHANNEL_INACTIVE);
disconnectResponse.setErrorMessage("Channel " +
Expand Down Expand Up @@ -208,7 +217,8 @@ private void doReceived(Response res) {
if (executor != null && executor instanceof ThreadlessExecutor) {
ThreadlessExecutor threadlessExecutor = (ThreadlessExecutor) executor;
if (threadlessExecutor.isWaiting()) {
threadlessExecutor.notifyReturn();
threadlessExecutor.notifyReturn(new IllegalStateException("The result has returned, but the biz thread is still waiting" +
" which is not an expected state, interrupt the thread manually by returning an exception."));
}
}
}
Expand Down Expand Up @@ -271,16 +281,20 @@ public void run(Timeout timeout) {
return;
}
if (future.getExecutor() != null) {
future.getExecutor().execute(() -> {
// create exception response.
Response timeoutResponse = new Response(future.getId());
// set timeout status.
timeoutResponse.setStatus(future.isSent() ? Response.SERVER_TIMEOUT : Response.CLIENT_TIMEOUT);
timeoutResponse.setErrorMessage(future.getTimeoutMessage(true));
// handle response.
DefaultFuture.received(future.getChannel(), timeoutResponse, true);
});
future.getExecutor().execute(() -> notifyTimeout(future));
} else {
notifyTimeout(future);
}
}

private void notifyTimeout(DefaultFuture future) {
// create exception response.
Response timeoutResponse = new Response(future.getId());
// set timeout status.
timeoutResponse.setStatus(future.isSent() ? Response.SERVER_TIMEOUT : Response.CLIENT_TIMEOUT);
timeoutResponse.setErrorMessage(future.getTimeoutMessage(true));
// handle response.
DefaultFuture.received(future.getChannel(), timeoutResponse, true);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,9 @@ public void setValue(Object value) {
responseFuture.complete(appResponse);
}
} catch (Exception e) {
// This should never happen;
logger.error("Got exception when trying to change the value of the underlying result from AsyncRpcResult.", e);
// This should not happen in normal request process;
logger.error("Got exception when trying to fetch the underlying result from AsyncRpcResult.");
throw new RpcException(e);
}
}

Expand All @@ -117,8 +118,9 @@ public void setException(Throwable t) {
responseFuture.complete(appResponse);
}
} catch (Exception e) {
// This should never happen;
logger.error("Got exception when trying to change the value of the underlying result from AsyncRpcResult.", e);
// This should not happen in normal request process;
logger.error("Got exception when trying to fetch the underlying result from AsyncRpcResult.");
throw new RpcException(e);
}
}

Expand All @@ -141,8 +143,9 @@ public Result getAppResponse() {
return responseFuture.get();
}
} catch (Exception e) {
// This should never happen;
logger.error("Got exception when trying to fetch the underlying result from AsyncRpcResult.", e);
// This should not happen in normal request process;
logger.error("Got exception when trying to fetch the underlying result from AsyncRpcResult.");
throw new RpcException(e);
}
return new AppResponse();
}
Expand All @@ -158,7 +161,7 @@ public Result getAppResponse() {
*/
@Override
public Result get() throws InterruptedException, ExecutionException {
if (executor != null) {
if (executor != null && executor instanceof ThreadlessExecutor) {
ThreadlessExecutor threadlessExecutor = (ThreadlessExecutor) executor;
threadlessExecutor.waitAndDrain();
}
Expand All @@ -167,7 +170,7 @@ public Result get() throws InterruptedException, ExecutionException {

@Override
public Result get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
if (executor != null) {
if (executor != null && executor instanceof ThreadlessExecutor) {
ThreadlessExecutor threadlessExecutor = (ThreadlessExecutor) executor;
threadlessExecutor.waitAndDrain();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@ public Result invoke(Invocation invocation) throws RpcException {

try {
if (InvokeMode.SYNC == ((RpcInvocation) invocation).getInvokeMode()) {
/**
* NOTICE!
* must call {@link java.util.concurrent.CompletableFuture#get(long, TimeUnit)} because
* {@link java.util.concurrent.CompletableFuture#get()} was proved to have serious performance drop.
*/
asyncResult.get(Integer.MAX_VALUE, TimeUnit.MILLISECONDS);
}
} catch (InterruptedException e) {
Expand Down