Skip to content

Commit

Permalink
Revert "optimize: the high CPU usage issue of the rpcMergeMessageSend…
Browse files Browse the repository at this point in the history
… thread (#6061)" (#6163)
  • Loading branch information
funky-eyes committed Dec 16, 2023
1 parent c0d0445 commit 501d431
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 66 deletions.
2 changes: 0 additions & 2 deletions changes/en-us/2.x.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ Add changes here for all PR submitted to the 2.x branch.
- [[#6121](https://github.com/seata/seata/pull/6121)] fix the branch transaction order error when rolling back

### optimize:
- [[#6061](https://github.com/seata/seata/pull/6061)] merge the rpcMergeMessageSend threads of rm and tm and increase the thread hibernation duration
- [[#6031](https://github.com/seata/seata/pull/6031)] add a check for the existence of the undolog table
- [[#6089](https://github.com/seata/seata/pull/6089)] modify the semantics of RaftServerFactory and remove unnecessary singleton
- [[#4473](https://github.com/seata/seata/pull/4473)] rm appdata size limit
Expand All @@ -39,7 +38,6 @@ Thanks to these contributors for their code commits. Please report an unintended

<!-- Please make sure your Github ID is in the list below -->
- [slievrly](https://github.com/slievrly)
- [yiqi](https://github.com/PleaseGiveMeTheCoke)
- [ptyin](https://github.com/ptyin)
- [laywin](https://github.com/laywin)
- [imcmai](https://github.com/imcmai)
Expand Down
2 changes: 0 additions & 2 deletions changes/zh-cn/2.x.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
- [[#6121](https://github.com/seata/seata/pull/6121)] 修复回滚分支事务时没有按照时间排序的问题

### optimize:
- [[#6061](https://github.com/seata/seata/pull/6061)] 合并rm和tm的rpcMergeMessageSend线程,增加线程休眠时长
- [[#6031](https://github.com/seata/seata/pull/6031)] 添加undo_log表的存在性校验
- [[#6089](https://github.com/seata/seata/pull/6089)] 修改RaftServerFactory语义并删除不必要的单例构建
- [[#4473](https://github.com/seata/seata/pull/4473)] rm appdata大小限制
Expand All @@ -38,7 +37,6 @@

<!-- 请确保您的 GitHub ID 在以下列表中 -->
- [slievrly](https://github.com/slievrly)
- [yiqi](https://github.com/PleaseGiveMeTheCoke)
- [ptyin](https://github.com/ptyin)
- [laywin](https://github.com/laywin)
- [imcmai](https://github.com/imcmai)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public abstract class AbstractNettyRemoting implements Disposable {
/**
* The Is sending.
*/
protected static volatile boolean isSending = false;
protected volatile boolean isSending = false;
private String group = "DEFAULT";

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

import java.lang.reflect.Field;
import java.net.InetSocketAddress;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
Expand Down Expand Up @@ -79,13 +77,14 @@ public abstract class AbstractNettyRemotingClient extends AbstractNettyRemoting
private static final String MSG_ID_PREFIX = "msgId:";
private static final String FUTURES_PREFIX = "futures:";
private static final String SINGLE_LOG_POSTFIX = ";";
private static final int MAX_MERGE_SEND_MILLS = 10;
private static final int MAX_MERGE_SEND_MILLS = 1;
private static final String THREAD_PREFIX_SPLIT_CHAR = "_";
private static final int MAX_MERGE_SEND_THREAD = 1;
private static final long KEEP_ALIVE_TIME = Integer.MAX_VALUE;
private static final long SCHEDULE_DELAY_MILLS = 60 * 1000L;
private static final long SCHEDULE_INTERVAL_MILLS = 10 * 1000L;
private static final String MERGE_THREAD_NAME = "rpcMergeMessageSend";
protected static final Object MERGE_LOCK = new Object();
private static final String MERGE_THREAD_PREFIX = "rpcMergeMessageSend";
protected final Object mergeLock = new Object();

/**
* When sending message type is {@link MergeMessage}, will be stored to mergeMsgMap.
Expand All @@ -97,39 +96,29 @@ public abstract class AbstractNettyRemotingClient extends AbstractNettyRemoting
* Send via asynchronous thread {@link io.seata.core.rpc.netty.AbstractNettyRemotingClient.MergedSendRunnable}
* {@link AbstractNettyRemotingClient#isEnableClientBatchSendRequest()}
*/
protected static final ConcurrentHashMap<String/*serverAddress*/, Map<NettyPoolKey.TransactionRole, BlockingQueue<RpcMessage>>> BASKET_MAP = new ConcurrentHashMap<>();
protected final ConcurrentHashMap<String/*serverAddress*/, BlockingQueue<RpcMessage>> basketMap = new ConcurrentHashMap<>();
private final NettyClientBootstrap clientBootstrap;
private final NettyClientChannelManager clientChannelManager;
private final NettyPoolKey.TransactionRole transactionRole;
private static volatile ExecutorService mergeSendExecutorService;
private ExecutorService mergeSendExecutorService;
private TransactionMessageHandler transactionMessageHandler;
protected volatile boolean enableClientBatchSendRequest;

@Override
public void init() {
timerExecutor.scheduleAtFixedRate(() -> clientChannelManager.reconnect(getTransactionServiceGroup()), SCHEDULE_DELAY_MILLS, SCHEDULE_INTERVAL_MILLS, TimeUnit.MILLISECONDS);
if (this.isEnableClientBatchSendRequest()) {
startMergeSendThread();
mergeSendExecutorService = new ThreadPoolExecutor(MAX_MERGE_SEND_THREAD,
MAX_MERGE_SEND_THREAD,
KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(),
new NamedThreadFactory(getThreadPrefix(), MAX_MERGE_SEND_THREAD));
mergeSendExecutorService.submit(new MergedSendRunnable());
}
super.init();
clientBootstrap.start();
}

private void startMergeSendThread() {
if (mergeSendExecutorService == null) {
synchronized (AbstractNettyRemoting.class) {
if (mergeSendExecutorService == null) {
mergeSendExecutorService = new ThreadPoolExecutor(MAX_MERGE_SEND_THREAD,
MAX_MERGE_SEND_THREAD,
KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(),
new NamedThreadFactory(MERGE_THREAD_NAME, MAX_MERGE_SEND_THREAD));
mergeSendExecutorService.submit(new MergedSendRunnable());
}
}
}
}

public AbstractNettyRemotingClient(NettyClientConfig nettyClientConfig, EventExecutorGroup eventExecutorGroup,
ThreadPoolExecutor messageExecutor, NettyPoolKey.TransactionRole transactionRole) {
super(messageExecutor);
Expand Down Expand Up @@ -157,14 +146,8 @@ public Object sendSyncRequest(Object msg) throws TimeoutException {
futures.put(rpcMessage.getId(), messageFuture);

// put message into basketMap
Map<NettyPoolKey.TransactionRole, BlockingQueue<RpcMessage>> roleMessage = CollectionUtils.computeIfAbsent(BASKET_MAP, serverAddress,
key -> {
Map<NettyPoolKey.TransactionRole, BlockingQueue<RpcMessage>> map = new HashMap<>(2);
map.put(NettyPoolKey.TransactionRole.TMROLE, new LinkedBlockingQueue<>());
map.put(NettyPoolKey.TransactionRole.RMROLE, new LinkedBlockingQueue<>());
return map;
});
BlockingQueue<RpcMessage> basket = roleMessage.get(transactionRole);
BlockingQueue<RpcMessage> basket = CollectionUtils.computeIfAbsent(basketMap, serverAddress,
key -> new LinkedBlockingQueue<>());
if (!basket.offer(rpcMessage)) {
LOGGER.error("put message into basketMap offer failed, serverAddress:{},rpcMessage:{}",
serverAddress, rpcMessage);
Expand All @@ -174,8 +157,8 @@ public Object sendSyncRequest(Object msg) throws TimeoutException {
LOGGER.debug("offer message: {}", rpcMessage.getBody());
}
if (!isSending) {
synchronized (MERGE_LOCK) {
MERGE_LOCK.notifyAll();
synchronized (mergeLock) {
mergeLock.notifyAll();
}
}

Expand Down Expand Up @@ -308,6 +291,10 @@ protected String getXid(Object msg) {
return StringUtils.isBlank(xid) ? String.valueOf(ThreadLocalRandom.current().nextLong(Long.MAX_VALUE)) : xid;
}

private String getThreadPrefix() {
return AbstractNettyRemotingClient.MERGE_THREAD_PREFIX + THREAD_PREFIX_SPLIT_CHAR + transactionRole.name();
}

/**
* Get pool key function.
*
Expand Down Expand Up @@ -339,72 +326,59 @@ protected String getXid(Object msg) {
/**
* The type Merged send runnable.
*/
private static class MergedSendRunnable implements Runnable {
private class MergedSendRunnable implements Runnable {

@Override
public void run() {
while (true) {
if (BASKET_MAP.values().stream().allMatch(map -> map.values().stream().allMatch(Collection::isEmpty))) {
synchronized (MERGE_LOCK) {
if (BASKET_MAP.values().stream().allMatch(map -> map.values().stream().allMatch(Collection::isEmpty))) {
try {
MERGE_LOCK.wait(MAX_MERGE_SEND_MILLS);
} catch (InterruptedException e) {
}
}
synchronized (mergeLock) {
try {
mergeLock.wait(MAX_MERGE_SEND_MILLS);
} catch (InterruptedException e) {
}
}
isSending = true;
BASKET_MAP.forEach((address, roleMessage) -> roleMessage.forEach((role, basket) -> {
basketMap.forEach((address, basket) -> {
if (basket.isEmpty()) {
return;
}

AbstractNettyRemotingClient client;
if (role.equals(NettyPoolKey.TransactionRole.RMROLE)) {
client = RmNettyRemotingClient.getInstance();
} else {
client = TmNettyRemotingClient.getInstance();
}

ConcurrentHashMap<Integer, MessageFuture> clientFutures = client.getFutures();

MergedWarpMessage mergeMessage = new MergedWarpMessage();
while (!basket.isEmpty()) {
RpcMessage msg = basket.poll();
mergeMessage.msgs.add((AbstractMessage) msg.getBody());
mergeMessage.msgIds.add(msg.getId());
}
if (mergeMessage.msgIds.size() > 1) {
printMergeMessageLog(clientFutures, mergeMessage);
printMergeMessageLog(mergeMessage);
}
Channel sendChannel = null;
try {
// send batch message is sync request, but there is no need to get the return value.
// Since the messageFuture has been created before the message is placed in basketMap,
// the return value will be obtained in ClientOnResponseProcessor.
sendChannel = client.getClientChannelManager().acquireChannel(address);
client.sendAsyncRequest(sendChannel, mergeMessage);
sendChannel = clientChannelManager.acquireChannel(address);
AbstractNettyRemotingClient.this.sendAsyncRequest(sendChannel, mergeMessage);
} catch (FrameworkException e) {
if (e.getErrcode() == FrameworkErrorCode.ChannelIsNotWritable && sendChannel != null) {
client.destroyChannel(address, sendChannel);
destroyChannel(address, sendChannel);
}
// fast fail
for (Integer msgId : mergeMessage.msgIds) {
MessageFuture messageFuture = clientFutures.remove(msgId);
MessageFuture messageFuture = futures.remove(msgId);
if (messageFuture != null) {
messageFuture.setResultMessage(
new RuntimeException(String.format("%s is unreachable", address), e));
new RuntimeException(String.format("%s is unreachable", address), e));
}
}
LOGGER.error("client merge call failed: {}", e.getMessage(), e);
}
}));
});
isSending = false;
}
}

private void printMergeMessageLog(ConcurrentHashMap<Integer, MessageFuture> clientFutures, MergedWarpMessage mergeMessage) {
private void printMergeMessageLog(MergedWarpMessage mergeMessage) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("merge msg size:{}", mergeMessage.msgIds.size());
for (AbstractMessage cm : mergeMessage.msgs) {
Expand All @@ -415,7 +389,7 @@ private void printMergeMessageLog(ConcurrentHashMap<Integer, MessageFuture> clie
sb.append(MSG_ID_PREFIX).append(l).append(SINGLE_LOG_POSTFIX);
}
sb.append("\n");
for (long l : clientFutures.keySet()) {
for (long l : futures.keySet()) {
sb.append(FUTURES_PREFIX).append(l).append(SINGLE_LOG_POSTFIX);
}
LOGGER.debug(sb.toString());
Expand Down

0 comments on commit 501d431

Please sign in to comment.