Skip to content

Commit

Permalink
Direct return when the server goes down unnormally. (#2185)
Browse files Browse the repository at this point in the history
* Keep the unfinished request in every channel.
When the server goes down un-normally, return the unfinished requests directly.
The current way is to wait until timeout.

* default method

* direct return unfinished requests when remote inactive or disconnect

* direct return unfinished requests when remote inactive or disconnect

* name and doc

* name and doc

* fix unit test

* fix unit test

* fix unit test

* close heartbeat to fix unit test

* sth test

* sth test

* sth test

* sth test

* close when client close

* fix unit test

* fix unit test

* fix unit test

* ignore remote invoke unit test

* don't clear request when channelInactive

* replace import *

* remove unuse import

* optimize

* optimize

* fix ci failed

* fix ci failed

* fix ci failed

* fix ci failed

* fix ci failed

* fix ci failed

* fix ci failed

* optimize unfinished request keeper

* rebase onto master

* fix review code
  • Loading branch information
carryxyh authored and beiwei30 committed Aug 23, 2018
1 parent 53d34f7 commit c6fd684
Show file tree
Hide file tree
Showing 11 changed files with 69 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@
/**
* Channel. (API/SPI, Prototype, ThreadSafe)
*
*
*
* @see org.apache.dubbo.remoting.Client
* @see org.apache.dubbo.remoting.Server#getChannels()
* @see org.apache.dubbo.remoting.Server#getChannel(InetSocketAddress)
Expand Down Expand Up @@ -73,5 +71,4 @@ public interface Channel extends Endpoint {
* @param key key.
*/
void removeAttribute(String key);

}
Original file line number Diff line number Diff line change
Expand Up @@ -57,5 +57,4 @@ public interface ExchangeChannel extends Channel {
*/
@Override
void close(int timeout);

}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ public class Response {
*/
public static final byte SERVER_TIMEOUT = 31;

/**
* channel inactive, directly return the unfinished requests.
*/
public static final byte CHANNEL_INACTIVE = 35;

/**
* request format error.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,9 @@ public class DefaultFuture implements ResponseFuture {

private static final Logger logger = LoggerFactory.getLogger(DefaultFuture.class);

private static final Map<Long, Channel> CHANNELS = new ConcurrentHashMap<Long, Channel>();
private static final Map<Long, Channel> CHANNELS = new ConcurrentHashMap<>();

private static final Map<Long, DefaultFuture> FUTURES = new ConcurrentHashMap<Long, DefaultFuture>();
private static final Map<Long, DefaultFuture> FUTURES = new ConcurrentHashMap<>();

public static final Timer TIME_OUT_TIMER = new HashedWheelTimer(
new NamedThreadFactory("dubbo-future-timeout", true),
Expand Down Expand Up @@ -87,10 +87,21 @@ private static void timeoutCheck(DefaultFuture future) {
TIME_OUT_TIMER.newTimeout(task, future.getTimeout(), TimeUnit.MILLISECONDS);
}

/**
* init a DefaultFuture
* 1.init a DefaultFuture
* 2.timeout check
*
* @param channel channel
* @param request the request
* @param timeout timeout
* @return a new DefaultFuture
*/
public static DefaultFuture newFuture(Channel channel, Request request, int timeout) {
final DefaultFuture defaultFuture = new DefaultFuture(channel, request, timeout);
timeoutCheck(defaultFuture);
return defaultFuture;
final DefaultFuture future = new DefaultFuture(channel, request, timeout);
// timeout check
timeoutCheck(future);
return future;
}

public static DefaultFuture getFuture(long id) {
Expand All @@ -108,6 +119,29 @@ public static void sent(Channel channel, Request request) {
}
}

/**
* close a channel when a channel is inactive
* directly return the unfinished requests.
*
* @param channel channel to close
*/
public static void closeChannel(Channel channel) {
for (long id : CHANNELS.keySet()) {
if (channel.equals(CHANNELS.get(id))) {
DefaultFuture future = getFuture(id);
if (future != null && !future.isDone()) {
Response disconnectResponse = new Response(future.getId());
disconnectResponse.setStatus(Response.CHANNEL_INACTIVE);
disconnectResponse.setErrorMessage("Channel " +
channel +
" is inactive. Directly return the unFinished request : " +
future.getRequest());
DefaultFuture.received(channel, disconnectResponse);
}
}
}
}

public static void received(Channel channel, Response response) {
try {
DefaultFuture future = FUTURES.remove(response.getId());
Expand Down Expand Up @@ -212,6 +246,7 @@ public void run(Timeout timeout) {
timeoutResponse.setErrorMessage(future.getTimeoutMessage(true));
// handle response.
DefaultFuture.received(future.getChannel(), timeoutResponse);

}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ public void disconnected(Channel channel) throws RemotingException {
try {
handler.disconnected(exchangeChannel);
} finally {
DefaultFuture.closeChannel(channel);
HeaderExchangeChannel.removeChannelIfDisconnected(channel);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,5 +43,4 @@ public void send(Object message, boolean sent) throws RemotingException {
public String toString() {
return getLocalAddress() + " -> " + getRemoteAddress();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,6 @@ public void test_Lazy_ChannelReadOnly() throws Exception {

}
//invoke method --> init client

IDemoService service = (IDemoService) proxy.getProxy(invoker);
Assert.assertEquals("ok", service.get());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,30 +23,31 @@
import org.apache.dubbo.rpc.ProxyFactory;
import org.apache.dubbo.rpc.protocol.dubbo.support.DemoService;
import org.apache.dubbo.rpc.protocol.dubbo.support.DemoServiceImpl;

import junit.framework.TestCase;
import org.junit.Assert;
import org.junit.Test;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

public class MultiThreadTest extends TestCase {
public class MultiThreadTest {

private Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();
private ProxyFactory proxy = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();

@Test
public void testDubboMultiThreadInvoke() throws Exception {
Exporter<?> rpcExporter = protocol.export(proxy.getInvoker(new DemoServiceImpl(), DemoService.class, URL.valueOf("dubbo://127.0.0.1:20259/TestService")));

final AtomicInteger counter = new AtomicInteger();
final DemoService service = proxy.getProxy(protocol.refer(DemoService.class, URL.valueOf("dubbo://127.0.0.1:20259/TestService")));
assertEquals(service.getSize(new String[]{"123", "456", "789"}), 3);
Assert.assertEquals(service.getSize(new String[]{"123", "456", "789"}), 3);

final StringBuffer sb = new StringBuffer();
for (int i = 0; i < 1024 * 64 + 32; i++)
sb.append('A');
assertEquals(sb.toString(), service.echo(sb.toString()));
Assert.assertEquals(sb.toString(), service.echo(sb.toString()));

ExecutorService exec = Executors.newFixedThreadPool(10);
for (int i = 0; i < 10; i++) {
Expand All @@ -55,7 +56,7 @@ public void testDubboMultiThreadInvoke() throws Exception {
public void run() {
for (int i = 0; i < 30; i++) {
System.out.println(fi + ":" + counter.getAndIncrement());
assertEquals(service.echo(sb.toString()), sb.toString());
Assert.assertEquals(service.echo(sb.toString()), sb.toString());
}
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,14 @@ public void test_counter_error() {
Assert.assertEquals("should not warning message", 0, LogUtil.findMessage(errorMsg));
// counter is incorrect, invocation still succeeds
client.close();

// wait close done.
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Assert.fail();
}

Assert.assertEquals("hello", helloService.hello());
Assert.assertEquals("should warning message", 1, LogUtil.findMessage(errorMsg));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,23 +24,25 @@
import org.apache.dubbo.rpc.protocol.dubbo.support.DemoServiceImpl;
import org.apache.dubbo.rpc.service.EchoService;

import junit.framework.TestCase;
import org.junit.Assert;
import org.junit.Test;

public class RpcFilterTest extends TestCase {
public class RpcFilterTest {
private Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();
private ProxyFactory proxy = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();

@Test
public void testRpcFilter() throws Exception {
DemoService service = new DemoServiceImpl();
URL url = URL.valueOf("dubbo://127.0.0.1:9010/org.apache.dubbo.rpc.DemoService?service.filter=echo");
protocol.export(proxy.getInvoker(service, DemoService.class, url));
service = proxy.getProxy(protocol.refer(DemoService.class, url));
assertEquals("123", service.echo("123"));
Assert.assertEquals("123", service.echo("123"));
// cast to EchoService
EchoService echo = proxy.getProxy(protocol.refer(EchoService.class, url));
assertEquals(echo.$echo("test"), "test");
assertEquals(echo.$echo("abcdefg"), "abcdefg");
assertEquals(echo.$echo(1234), 1234);
Assert.assertEquals(echo.$echo("test"), "test");
Assert.assertEquals(echo.$echo("abcdefg"), "abcdefg");
Assert.assertEquals(echo.$echo(1234), 1234);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import org.apache.thrift.transport.TIOStreamTransport;
import org.apache.thrift.transport.TTransport;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;

import java.io.ByteArrayInputStream;
Expand Down

0 comments on commit c6fd684

Please sign in to comment.