From f7527b38f156f6251f70e57c815a9397b3d43a43 Mon Sep 17 00:00:00 2001 From: absolute8511 Date: Thu, 20 Jul 2023 22:04:55 +0800 Subject: [PATCH 1/2] fix: avoid close success channel if invokeSync most time cost on get channel Change-Id: I29741cf55ac6333bfa30fef755357b78a22b1325 --- .../rocketmq/remoting/netty/NettyRemotingClient.java | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java index 9715b918a29..f60b6357253 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java @@ -88,6 +88,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti private static final Logger LOGGER = LoggerFactory.getLogger(LoggerName.ROCKETMQ_REMOTING_NAME); private static final long LOCK_TIMEOUT_MILLIS = 3000; + private static final long MIN_CLOSE_TIMEOUT_MILLIS = 100; private final NettyClientConfig nettyClientConfig; private final Bootstrap bootstrap = new Bootstrap(); @@ -524,13 +525,15 @@ public RemotingCommand invokeSync(String addr, final RemotingCommand request, lo final Channel channel = this.getAndCreateChannel(addr); String channelRemoteAddr = RemotingHelper.parseChannelRemoteAddr(channel); if (channel != null && channel.isActive()) { + long left = timeoutMillis; try { doBeforeRpcHooks(channelRemoteAddr, request); long costTime = System.currentTimeMillis() - beginStartTime; - if (timeoutMillis < costTime) { + left -= costTime; + if (left <= 0) { throw new RemotingTimeoutException("invokeSync call the addr[" + channelRemoteAddr + "] timeout"); } - RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis - costTime); + RemotingCommand response = this.invokeSyncImpl(channel, request, left); doAfterRpcHooks(channelRemoteAddr, request, response); this.updateChannelLastResponseTime(addr); return response; @@ -539,7 +542,9 @@ public RemotingCommand invokeSync(String addr, final RemotingCommand request, lo this.closeChannel(addr, channel); throw e; } catch (RemotingTimeoutException e) { - if (nettyClientConfig.isClientCloseSocketIfTimeout()) { + // avoid close the success channel if left timeout is small, since it may cost too much time in get the success channel, the left timeout for read is small + boolean shouldClose = left > MIN_CLOSE_TIMEOUT_MILLIS || left > timeoutMillis/4; + if (nettyClientConfig.isClientCloseSocketIfTimeout() && shouldClose) { this.closeChannel(addr, channel); LOGGER.warn("invokeSync: close socket because of timeout, {}ms, {}", timeoutMillis, channelRemoteAddr); } From e05b76e11a64d0dc33e6e0a4f6f5537ae18cd5ec Mon Sep 17 00:00:00 2001 From: absolute8511 Date: Fri, 21 Jul 2023 10:08:17 +0800 Subject: [PATCH 2/2] fix: ci style Change-Id: I8c9b86e9cb6f1463bf213e64c9b8c139afa794c8 --- .../org/apache/rocketmq/remoting/netty/NettyRemotingClient.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java index f60b6357253..8491f4354c6 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java @@ -543,7 +543,7 @@ public RemotingCommand invokeSync(String addr, final RemotingCommand request, lo throw e; } catch (RemotingTimeoutException e) { // avoid close the success channel if left timeout is small, since it may cost too much time in get the success channel, the left timeout for read is small - boolean shouldClose = left > MIN_CLOSE_TIMEOUT_MILLIS || left > timeoutMillis/4; + boolean shouldClose = left > MIN_CLOSE_TIMEOUT_MILLIS || left > timeoutMillis / 4; if (nettyClientConfig.isClientCloseSocketIfTimeout() && shouldClose) { this.closeChannel(addr, channel); LOGGER.warn("invokeSync: close socket because of timeout, {}ms, {}", timeoutMillis, channelRemoteAddr);