Skip to content

Commit

Permalink
polish the code for pull request 2658: Optimize heartbeat and reconne…
Browse files Browse the repository at this point in the history
…ct task (apache#2709)
  • Loading branch information
beiwei30 authored and CrazyHZM committed Dec 6, 2018
1 parent 61c0607 commit bcc6150
Show file tree
Hide file tree
Showing 7 changed files with 64 additions and 59 deletions.
12 changes: 10 additions & 2 deletions dubbo-common/src/main/java/org/apache/dubbo/common/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -247,9 +247,17 @@ public class Constants {

public static final String HEARTBEAT_KEY = "heartbeat";

public static final int HEARTBEAT_TICK = 3;
/**
* Every heartbeat duration / HEATBEAT_CHECK_TICK, check if a heartbeat should be sent. Every heartbeat timeout
* duration / HEATBEAT_CHECK_TICK, check if a connection should be closed on server side, and if reconnect on
* client side
*/
public static final int HEARTBEAT_CHECK_TICK = 3;

public static final long LEAST_HEARTBEAT_TICK = 1000;
/**
* the least heartbeat during is 1000 ms.
*/
public static final long LEAST_HEARTBEAT_DURATION = 1000;

/**
* ticks per wheel. Currently only contains two tasks, so 16 locations are enough
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,43 +30,40 @@
*/
public abstract class AbstractTimerTask implements TimerTask {

protected final ChannelProvider channelProvider;
private final ChannelProvider channelProvider;

protected final Long tick;
private final Long tick;

protected AbstractTimerTask(ChannelProvider channelProvider, Long tick) {
AbstractTimerTask(ChannelProvider channelProvider, Long tick) {
if (channelProvider == null || tick == null) {
throw new IllegalArgumentException();
}
this.tick = tick;
this.channelProvider = channelProvider;
}

protected Long lastRead(Channel channel) {
return (Long) channel.getAttribute(
HeaderExchangeHandler.KEY_READ_TIMESTAMP);
static Long lastRead(Channel channel) {
return (Long) channel.getAttribute(HeaderExchangeHandler.KEY_READ_TIMESTAMP);
}

protected Long lastWrite(Channel channel) {
return (Long) channel.getAttribute(
HeaderExchangeHandler.KEY_WRITE_TIMESTAMP);
static Long lastWrite(Channel channel) {
return (Long) channel.getAttribute(HeaderExchangeHandler.KEY_WRITE_TIMESTAMP);
}

protected Long now() {
static Long now() {
return System.currentTimeMillis();
}

protected void reput(Timeout timeout, Long tick) {
private void reput(Timeout timeout, Long tick) {
if (timeout == null || tick == null) {
throw new IllegalArgumentException();
}

Timer timer = timeout.timer();
if (timer.isStop()) {
return;
}
if (timeout.isCancelled()) {
if (timer.isStop() || timeout.isCancelled()) {
return;
}

timer.newTimeout(timeout.task(), tick, TimeUnit.MILLISECONDS);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,17 +51,17 @@ public HeaderExchangeClient(Client client, boolean needHeartbeat) {
this.client = client;
this.channel = new HeaderExchangeChannel(client);
String dubbo = client.getUrl().getParameter(Constants.DUBBO_VERSION_KEY);
this.heartbeat = client.getUrl().getParameter(Constants.HEARTBEAT_KEY, dubbo != null && dubbo.startsWith("1.0.") ? Constants.DEFAULT_HEARTBEAT : 0);

this.heartbeat = client.getUrl().getParameter(Constants.HEARTBEAT_KEY, dubbo != null &&
dubbo.startsWith("1.0.") ? Constants.DEFAULT_HEARTBEAT : 0);
this.heartbeatTimeout = client.getUrl().getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3);
if (heartbeatTimeout < heartbeat * 2) {
throw new IllegalStateException("heartbeatTimeout < heartbeatInterval * 2");
}

if (needHeartbeat) {
long heartbeatTick = calcLeastTick(heartbeat);

// use heartbeatTick as every tick.
heartbeatTimer = new HashedWheelTimer(heartbeatTick, TimeUnit.MILLISECONDS, Constants.TICKS_PER_WHEEL);
long tickDuration = calculateLeastDuration(heartbeat);
heartbeatTimer = new HashedWheelTimer(tickDuration, TimeUnit.MILLISECONDS, Constants.TICKS_PER_WHEEL);
startHeartbeatTimer();
}
}
Expand Down Expand Up @@ -179,8 +179,8 @@ public boolean hasAttribute(String key) {
private void startHeartbeatTimer() {
AbstractTimerTask.ChannelProvider cp = () -> Collections.singletonList(HeaderExchangeClient.this);

long heartbeatTick = calcLeastTick(heartbeat);
long heartbeatTimeoutTick = calcLeastTick(heartbeatTimeout);
long heartbeatTick = calculateLeastDuration(heartbeat);
long heartbeatTimeoutTick = calculateLeastDuration(heartbeatTimeout);
HeartbeatTimerTask heartBeatTimerTask = new HeartbeatTimerTask(cp, heartbeatTick, heartbeat);
ReconnectTimerTask reconnectTimerTask = new ReconnectTimerTask(cp, heartbeatTimeoutTick, heartbeatTimeout);

Expand All @@ -203,11 +203,11 @@ private void doClose() {
/**
* Each interval cannot be less than 1000ms.
*/
private long calcLeastTick(int time) {
if (time / Constants.HEARTBEAT_TICK <= 0) {
return Constants.LEAST_HEARTBEAT_TICK;
private long calculateLeastDuration(int time) {
if (time / Constants.HEARTBEAT_CHECK_TICK <= 0) {
return Constants.LEAST_HEARTBEAT_DURATION;
} else {
return time / Constants.HEARTBEAT_TICK;
return time / Constants.HEARTBEAT_CHECK_TICK;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,11 @@
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import static java.util.Collections.unmodifiableCollection;

/**
* ExchangeServerImpl
*/
Expand All @@ -63,9 +64,6 @@ public HeaderExchangeServer(Server server) {
throw new IllegalStateException("heartbeatTimeout < heartbeatInterval * 2");
}

long heartbeatTick = calcLeastTick(heartbeat);
// use heartbeatTick as every tick.
heartbeatTimer = new HashedWheelTimer(heartbeatTick, TimeUnit.MILLISECONDS, Constants.TICKS_PER_WHEEL);
startHeartbeatTimer();
}

Expand Down Expand Up @@ -217,10 +215,6 @@ public void reset(URL url) {
heartbeatTimeout = t;

stopHeartbeatTimer();

long heartbeatTick = calcLeastTick(heartbeat);
// use heartbeatTick as every tick.
heartbeatTimer = new HashedWheelTimer(heartbeatTick, TimeUnit.MILLISECONDS, Constants.TICKS_PER_WHEEL);
startHeartbeatTimer();
}
}
Expand All @@ -238,24 +232,40 @@ public void reset(org.apache.dubbo.common.Parameters parameters) {
@Override
public void send(Object message) throws RemotingException {
if (closed.get()) {
throw new RemotingException(this.getLocalAddress(), null, "Failed to send message " + message + ", cause: The server " + getLocalAddress() + " is closed!");
throw new RemotingException(this.getLocalAddress(), null, "Failed to send message " + message
+ ", cause: The server " + getLocalAddress() + " is closed!");
}
server.send(message);
}

@Override
public void send(Object message, boolean sent) throws RemotingException {
if (closed.get()) {
throw new RemotingException(this.getLocalAddress(), null, "Failed to send message " + message + ", cause: The server " + getLocalAddress() + " is closed!");
throw new RemotingException(this.getLocalAddress(), null, "Failed to send message " + message
+ ", cause: The server " + getLocalAddress() + " is closed!");
}
server.send(message, sent);
}

/**
* Each interval cannot be less than 1000ms.
*/
private long calculateLeastDuration(int time) {
if (time / Constants.HEARTBEAT_CHECK_TICK <= 0) {
return Constants.LEAST_HEARTBEAT_DURATION;
} else {
return time / Constants.HEARTBEAT_CHECK_TICK;
}
}

private void startHeartbeatTimer() {
AbstractTimerTask.ChannelProvider cp = () -> Collections.unmodifiableCollection(HeaderExchangeServer.this.getChannels());
long tickDuration = calculateLeastDuration(heartbeat);
heartbeatTimer = new HashedWheelTimer(tickDuration, TimeUnit.MILLISECONDS, Constants.TICKS_PER_WHEEL);

long heartbeatTick = calcLeastTick(heartbeat);
long heartbeatTimeoutTick = calcLeastTick(heartbeatTimeout);
AbstractTimerTask.ChannelProvider cp = () -> unmodifiableCollection(HeaderExchangeServer.this.getChannels());

long heartbeatTick = calculateLeastDuration(heartbeat);
long heartbeatTimeoutTick = calculateLeastDuration(heartbeatTimeout);
HeartbeatTimerTask heartBeatTimerTask = new HeartbeatTimerTask(cp, heartbeatTick, heartbeat);
ReconnectTimerTask reconnectTimerTask = new ReconnectTimerTask(cp, heartbeatTimeoutTick, heartbeatTimeout);

Expand All @@ -264,17 +274,6 @@ private void startHeartbeatTimer() {
heartbeatTimer.newTimeout(reconnectTimerTask, heartbeatTimeoutTick, TimeUnit.MILLISECONDS);
}

/**
* Each interval cannot be less than 1000ms.
*/
private long calcLeastTick(int time) {
if (time / Constants.HEARTBEAT_TICK <= 0) {
return Constants.LEAST_HEARTBEAT_TICK;
} else {
return time / Constants.HEARTBEAT_TICK;
}
}

private void stopHeartbeatTimer() {
if (heartbeatTimer != null) {
heartbeatTimer.stop();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public class HeartbeatTimerTask extends AbstractTimerTask {

private final int heartbeat;

protected HeartbeatTimerTask(ChannelProvider channelProvider, Long heartbeatTick, int heartbeat) {
HeartbeatTimerTask(ChannelProvider channelProvider, Long heartbeatTick, int heartbeat) {
super(channelProvider, heartbeatTick);
this.heartbeat = heartbeat;
}
Expand All @@ -51,7 +51,8 @@ protected void doTask(Channel channel) {
channel.send(req);
if (logger.isDebugEnabled()) {
logger.debug("Send heartbeat to remote channel " + channel.getRemoteAddress()
+ ", cause: The channel has no data-transmission exceeds a heartbeat period: " + heartbeat + "ms");
+ ", cause: The channel has no data-transmission exceeds a heartbeat period: "
+ heartbeat + "ms");
}
}
} catch (Throwable t) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public class ReconnectTimerTask extends AbstractTimerTask {

private final int heartbeatTimeout;

protected ReconnectTimerTask(ChannelProvider channelProvider, Long heartbeatTimeoutTick, int heartbeatTimeout1) {
ReconnectTimerTask(ChannelProvider channelProvider, Long heartbeatTimeoutTick, int heartbeatTimeout1) {
super(channelProvider, heartbeatTimeoutTick);
this.heartbeatTimeout = heartbeatTimeout1;
}
Expand All @@ -42,8 +42,8 @@ protected void doTask(Channel channel) {
Long lastRead = lastRead(channel);
Long now = now();
if (lastRead != null && now - lastRead > heartbeatTimeout) {
logger.warn("Close channel " + channel
+ ", because heartbeat read idle time out: " + heartbeatTimeout + "ms");
logger.warn("Close channel " + channel + ", because heartbeat read idle time out: "
+ heartbeatTimeout + "ms");
if (channel instanceof Client) {
try {
((Client) channel).reconnect();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public class HeartBeatTaskTest {
@Before
public void setup() throws Exception {
long tickDuration = 1000;
heartbeatTimer = new HashedWheelTimer(tickDuration / Constants.HEARTBEAT_TICK, TimeUnit.MILLISECONDS);
heartbeatTimer = new HashedWheelTimer(tickDuration / Constants.HEARTBEAT_CHECK_TICK, TimeUnit.MILLISECONDS);

channel = new MockChannel() {

Expand All @@ -53,7 +53,7 @@ public URL getUrl() {
};

AbstractTimerTask.ChannelProvider cp = () -> Collections.<Channel>singletonList(channel);
heartbeatTimerTask = new HeartbeatTimerTask(cp, tickDuration / Constants.HEARTBEAT_TICK, (int) tickDuration);
heartbeatTimerTask = new HeartbeatTimerTask(cp, tickDuration / Constants.HEARTBEAT_CHECK_TICK, (int) tickDuration);
}

@Test
Expand Down

0 comments on commit bcc6150

Please sign in to comment.