Skip to content

Commit

Permalink
fix conflicts
Browse files Browse the repository at this point in the history
  • Loading branch information
slievrly committed Dec 19, 2023
2 parents b3b29e5 + 2f9957b commit fda95a9
Show file tree
Hide file tree
Showing 9 changed files with 67 additions and 74 deletions.
8 changes: 4 additions & 4 deletions changes/en-us/2.x.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@ Add changes here for all PR submitted to the 2.x branch.
- [[#6075](https://github.com/seata/seata/pull/6075)] fix missing table alias for on update column of image SQL
- [[#6086](https://github.com/seata/seata/pull/6086)] fix oracle column alias cannot find
- [[#6085](https://github.com/seata/seata/pull/6085)] fix jdk9+ compile error
- [[#6101](https://github.com/seata/seata/pull/6101)] fix the consumer can't generate tcc proxy in dubbo 3.x version
- [[#6077](https://github.com/seata/seata/pull/6077)] fix could not rollback when table with multiple primary
- [[#6121](https://github.com/seata/seata/pull/6121)] fix the branch transaction order error when rolling back

### optimize:
- [[#6061](https://github.com/seata/seata/pull/6061)] merge the rpcMergeMessageSend threads of rm and tm and increase the thread hibernation duration
- [[#6031](https://github.com/seata/seata/pull/6031)] add a check for the existence of the undolog table
- [[#6089](https://github.com/seata/seata/pull/6089)] modify the semantics of RaftServerFactory and remove unnecessary singleton
- [[#4473](https://github.com/seata/seata/pull/4473)] rm appdata size limit
Expand All @@ -24,11 +24,12 @@ Add changes here for all PR submitted to the 2.x branch.
- [[#6098](https://github.com/seata/seata/pull/6098)] optimize the retry logic in the acquireMetadata method
- [[#6034](https://github.com/seata/seata/pull/6034)] using namespace from command line when deployment with helm charts
- [[#6116](https://github.com/seata/seata/pull/6034)] remove lgtm.com stuff

- [[#6145](https://github.com/seata/seata/pull/6145)] upgrade jettison to 1.5.4

### security:
- [[#6069](https://github.com/seata/seata/pull/6069)] Upgrade Guava dependencies to fix security vulnerabilities
- [[#6147](https://github.com/seata/seata/pull/6147)] upgrade kafka-clients to 3.5.2
- [[#6144](https://github.com/seata/seata/pull/6144)] upgrade nacos client to 1.4.6
- [[#6147](https://github.com/seata/seata/pull/6147)] upgrade kafka-clients to 3.6.1

### test:
- [[#6081](https://github.com/seata/seata/pull/6081)] add `test-os.yml` for testing the OS
Expand All @@ -39,7 +40,6 @@ Thanks to these contributors for their code commits. Please report an unintended

<!-- Please make sure your Github ID is in the list below -->
- [slievrly](https://github.com/slievrly)
- [yiqi](https://github.com/PleaseGiveMeTheCoke)
- [ptyin](https://github.com/ptyin)
- [laywin](https://github.com/laywin)
- [imcmai](https://github.com/imcmai)
Expand Down
7 changes: 4 additions & 3 deletions changes/zh-cn/2.x.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@
- [[#6075](https://github.com/seata/seata/pull/6075)] 修复镜像SQL对于on update列没有添加表别名的问题
- [[#6086](https://github.com/seata/seata/pull/6086)] 修复oracle alias 解析异常
- [[#6085](https://github.com/seata/seata/pull/6085)] 修复jdk9+版本编译后,引入后ByteBuffer#flip NoSuchMethodError的问题
- [[#6101](https://github.com/seata/seata/pull/6101)] 修复在dubbo 3.x的版本中, 消费者端不能生成tcc代理的问题
- [[#6077](https://github.com/seata/seata/pull/6077)] 修复表存在复合主键索引导致无法回滚问题
- [[#6121](https://github.com/seata/seata/pull/6121)] 修复回滚分支事务时没有按照时间排序的问题

### optimize:
- [[#6061](https://github.com/seata/seata/pull/6061)] 合并rm和tm的rpcMergeMessageSend线程,增加线程休眠时长
- [[#6031](https://github.com/seata/seata/pull/6031)] 添加undo_log表的存在性校验
- [[#6089](https://github.com/seata/seata/pull/6089)] 修改RaftServerFactory语义并删除不必要的单例构建
- [[#4473](https://github.com/seata/seata/pull/4473)] rm appdata大小限制
Expand All @@ -27,7 +27,9 @@

### security:
- [[#6069](https://github.com/seata/seata/pull/6069)] 升级Guava依赖版本,修复安全漏洞
- [[#6147](https://github.com/seata/seata/pull/6147)] 升级 kafka-clients依赖至3.5.2
- [[#6144](https://github.com/seata/seata/pull/6144)] 升级Nacos依赖版本至1.4.6
- [[#6145](https://github.com/seata/seata/pull/6145)] 升级 jettison依赖版本至1.5.4
- [[#6147](https://github.com/seata/seata/pull/6147)] 升级 kafka-clients依赖至3.6.1

### test:
- [[#6081](https://github.com/seata/seata/pull/6081)] 添加 `test-os.yml` 用于测试seata在各种操作系统下的运行情况
Expand All @@ -37,7 +39,6 @@

<!-- 请确保您的 GitHub ID 在以下列表中 -->
- [slievrly](https://github.com/slievrly)
- [yiqi](https://github.com/PleaseGiveMeTheCoke)
- [ptyin](https://github.com/ptyin)
- [laywin](https://github.com/laywin)
- [imcmai](https://github.com/imcmai)
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/java/io/seata/core/rpc/RemotingServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public interface RemotingServer {
* @param resourceId rm client resourceId
* @param clientId rm client id
* @param msg transaction message {@code io.seata.core.protocol}
* @param tryOtherApp try other app
* @param tryOtherApp try other app
* @return client result message
* @throws TimeoutException TimeoutException
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public abstract class AbstractNettyRemoting implements Disposable {
/**
* The Is sending.
*/
protected static volatile boolean isSending = false;
protected volatile boolean isSending = false;
private String group = "DEFAULT";

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

import java.lang.reflect.Field;
import java.net.InetSocketAddress;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
Expand Down Expand Up @@ -79,13 +77,14 @@ public abstract class AbstractNettyRemotingClient extends AbstractNettyRemoting
private static final String MSG_ID_PREFIX = "msgId:";
private static final String FUTURES_PREFIX = "futures:";
private static final String SINGLE_LOG_POSTFIX = ";";
private static final int MAX_MERGE_SEND_MILLS = 10;
private static final int MAX_MERGE_SEND_MILLS = 1;
private static final String THREAD_PREFIX_SPLIT_CHAR = "_";
private static final int MAX_MERGE_SEND_THREAD = 1;
private static final long KEEP_ALIVE_TIME = Integer.MAX_VALUE;
private static final long SCHEDULE_DELAY_MILLS = 60 * 1000L;
private static final long SCHEDULE_INTERVAL_MILLS = 10 * 1000L;
private static final String MERGE_THREAD_NAME = "rpcMergeMessageSend";
protected static final Object MERGE_LOCK = new Object();
private static final String MERGE_THREAD_PREFIX = "rpcMergeMessageSend";
protected final Object mergeLock = new Object();

/**
* When sending message type is {@link MergeMessage}, will be stored to mergeMsgMap.
Expand All @@ -97,39 +96,29 @@ public abstract class AbstractNettyRemotingClient extends AbstractNettyRemoting
* Send via asynchronous thread {@link io.seata.core.rpc.netty.AbstractNettyRemotingClient.MergedSendRunnable}
* {@link AbstractNettyRemotingClient#isEnableClientBatchSendRequest()}
*/
protected static final ConcurrentHashMap<String/*serverAddress*/, Map<NettyPoolKey.TransactionRole, BlockingQueue<RpcMessage>>> BASKET_MAP = new ConcurrentHashMap<>();
protected final ConcurrentHashMap<String/*serverAddress*/, BlockingQueue<RpcMessage>> basketMap = new ConcurrentHashMap<>();
private final NettyClientBootstrap clientBootstrap;
private final NettyClientChannelManager clientChannelManager;
private final NettyPoolKey.TransactionRole transactionRole;
private static volatile ExecutorService mergeSendExecutorService;
private ExecutorService mergeSendExecutorService;
private TransactionMessageHandler transactionMessageHandler;
protected volatile boolean enableClientBatchSendRequest;

@Override
public void init() {
timerExecutor.scheduleAtFixedRate(() -> clientChannelManager.reconnect(getTransactionServiceGroup()), SCHEDULE_DELAY_MILLS, SCHEDULE_INTERVAL_MILLS, TimeUnit.MILLISECONDS);
if (this.isEnableClientBatchSendRequest()) {
startMergeSendThread();
mergeSendExecutorService = new ThreadPoolExecutor(MAX_MERGE_SEND_THREAD,
MAX_MERGE_SEND_THREAD,
KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(),
new NamedThreadFactory(getThreadPrefix(), MAX_MERGE_SEND_THREAD));
mergeSendExecutorService.submit(new MergedSendRunnable());
}
super.init();
clientBootstrap.start();
}

private void startMergeSendThread() {
if (mergeSendExecutorService == null) {
synchronized (AbstractNettyRemoting.class) {
if (mergeSendExecutorService == null) {
mergeSendExecutorService = new ThreadPoolExecutor(MAX_MERGE_SEND_THREAD,
MAX_MERGE_SEND_THREAD,
KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(),
new NamedThreadFactory(MERGE_THREAD_NAME, MAX_MERGE_SEND_THREAD));
mergeSendExecutorService.submit(new MergedSendRunnable());
}
}
}
}

public AbstractNettyRemotingClient(NettyClientConfig nettyClientConfig, EventExecutorGroup eventExecutorGroup,
ThreadPoolExecutor messageExecutor, NettyPoolKey.TransactionRole transactionRole) {
super(messageExecutor);
Expand Down Expand Up @@ -157,14 +146,8 @@ public Object sendSyncRequest(Object msg) throws TimeoutException {
futures.put(rpcMessage.getId(), messageFuture);

// put message into basketMap
Map<NettyPoolKey.TransactionRole, BlockingQueue<RpcMessage>> roleMessage = CollectionUtils.computeIfAbsent(BASKET_MAP, serverAddress,
key -> {
Map<NettyPoolKey.TransactionRole, BlockingQueue<RpcMessage>> map = new HashMap<>(2);
map.put(NettyPoolKey.TransactionRole.TMROLE, new LinkedBlockingQueue<>());
map.put(NettyPoolKey.TransactionRole.RMROLE, new LinkedBlockingQueue<>());
return map;
});
BlockingQueue<RpcMessage> basket = roleMessage.get(transactionRole);
BlockingQueue<RpcMessage> basket = CollectionUtils.computeIfAbsent(basketMap, serverAddress,
key -> new LinkedBlockingQueue<>());
if (!basket.offer(rpcMessage)) {
LOGGER.error("put message into basketMap offer failed, serverAddress:{},rpcMessage:{}",
serverAddress, rpcMessage);
Expand All @@ -174,8 +157,8 @@ public Object sendSyncRequest(Object msg) throws TimeoutException {
LOGGER.debug("offer message: {}", rpcMessage.getBody());
}
if (!isSending) {
synchronized (MERGE_LOCK) {
MERGE_LOCK.notifyAll();
synchronized (mergeLock) {
mergeLock.notifyAll();
}
}

Expand Down Expand Up @@ -308,6 +291,10 @@ protected String getXid(Object msg) {
return StringUtils.isBlank(xid) ? String.valueOf(ThreadLocalRandom.current().nextLong(Long.MAX_VALUE)) : xid;
}

private String getThreadPrefix() {
return AbstractNettyRemotingClient.MERGE_THREAD_PREFIX + THREAD_PREFIX_SPLIT_CHAR + transactionRole.name();
}

/**
* Get pool key function.
*
Expand Down Expand Up @@ -339,72 +326,59 @@ protected String getXid(Object msg) {
/**
* The type Merged send runnable.
*/
private static class MergedSendRunnable implements Runnable {
private class MergedSendRunnable implements Runnable {

@Override
public void run() {
while (true) {
if (BASKET_MAP.values().stream().allMatch(map -> map.values().stream().allMatch(Collection::isEmpty))) {
synchronized (MERGE_LOCK) {
if (BASKET_MAP.values().stream().allMatch(map -> map.values().stream().allMatch(Collection::isEmpty))) {
try {
MERGE_LOCK.wait(MAX_MERGE_SEND_MILLS);
} catch (InterruptedException e) {
}
}
synchronized (mergeLock) {
try {
mergeLock.wait(MAX_MERGE_SEND_MILLS);
} catch (InterruptedException e) {
}
}
isSending = true;
BASKET_MAP.forEach((address, roleMessage) -> roleMessage.forEach((role, basket) -> {
basketMap.forEach((address, basket) -> {
if (basket.isEmpty()) {
return;
}

AbstractNettyRemotingClient client;
if (role.equals(NettyPoolKey.TransactionRole.RMROLE)) {
client = RmNettyRemotingClient.getInstance();
} else {
client = TmNettyRemotingClient.getInstance();
}

ConcurrentHashMap<Integer, MessageFuture> clientFutures = client.getFutures();

MergedWarpMessage mergeMessage = new MergedWarpMessage();
while (!basket.isEmpty()) {
RpcMessage msg = basket.poll();
mergeMessage.msgs.add((AbstractMessage) msg.getBody());
mergeMessage.msgIds.add(msg.getId());
}
if (mergeMessage.msgIds.size() > 1) {
printMergeMessageLog(clientFutures, mergeMessage);
printMergeMessageLog(mergeMessage);
}
Channel sendChannel = null;
try {
// send batch message is sync request, but there is no need to get the return value.
// Since the messageFuture has been created before the message is placed in basketMap,
// the return value will be obtained in ClientOnResponseProcessor.
sendChannel = client.getClientChannelManager().acquireChannel(address);
client.sendAsyncRequest(sendChannel, mergeMessage);
sendChannel = clientChannelManager.acquireChannel(address);
AbstractNettyRemotingClient.this.sendAsyncRequest(sendChannel, mergeMessage);
} catch (FrameworkException e) {
if (e.getErrcode() == FrameworkErrorCode.ChannelIsNotWritable && sendChannel != null) {
client.destroyChannel(address, sendChannel);
destroyChannel(address, sendChannel);
}
// fast fail
for (Integer msgId : mergeMessage.msgIds) {
MessageFuture messageFuture = clientFutures.remove(msgId);
MessageFuture messageFuture = futures.remove(msgId);
if (messageFuture != null) {
messageFuture.setResultMessage(
new RuntimeException(String.format("%s is unreachable", address), e));
new RuntimeException(String.format("%s is unreachable", address), e));
}
}
LOGGER.error("client merge call failed: {}", e.getMessage(), e);
}
}));
});
isSending = false;
}
}

private void printMergeMessageLog(ConcurrentHashMap<Integer, MessageFuture> clientFutures, MergedWarpMessage mergeMessage) {
private void printMergeMessageLog(MergedWarpMessage mergeMessage) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("merge msg size:{}", mergeMessage.msgIds.size());
for (AbstractMessage cm : mergeMessage.msgs) {
Expand All @@ -415,7 +389,7 @@ private void printMergeMessageLog(ConcurrentHashMap<Integer, MessageFuture> clie
sb.append(MSG_ID_PREFIX).append(l).append(SINGLE_LOG_POSTFIX);
}
sb.append("\n");
for (long l : clientFutures.keySet()) {
for (long l : futures.keySet()) {
sb.append(FUTURES_PREFIX).append(l).append(SINGLE_LOG_POSTFIX);
}
LOGGER.debug(sb.toString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,7 @@ private static Channel getChannelFromSameClientMap(Map<Integer, RpcContext> clie
*
* @param resourceId Resource ID
* @param clientId Client ID - ApplicationId:IP:Port
* @param tryOtherApp try other app
* @return Corresponding channel, NULL if not found.
*/
public static Channel getChannel(String resourceId, String clientId, boolean tryOtherApp) {
Expand Down
16 changes: 13 additions & 3 deletions dependencies/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,10 @@
<spring-context-support.version>1.0.2</spring-context-support.version>
<mock-jedis.version>0.3.0</mock-jedis.version>
<apollo-client.version>2.0.1</apollo-client.version>
<eureka-clients.version>1.10.17</eureka-clients.version>
<eureka-clients.version>1.10.18</eureka-clients.version>
<jettison.version>1.5.4</jettison.version>
<consul-clients.version>1.4.2</consul-clients.version>
<nacos-client.version>1.4.2</nacos-client.version>
<nacos-client.version>1.4.6</nacos-client.version>
<etcd-client-v3.version>0.5.0</etcd-client-v3.version>
<testcontainers.version>1.11.2</testcontainers.version>
<guava.version>32.0.0-jre</guava.version>
Expand Down Expand Up @@ -91,7 +92,7 @@
<logback.version>1.2.9</logback.version>
<logstash-logback-encoder.version>6.5</logstash-logback-encoder.version>
<kafka-appender.version>0.2.0-RC2</kafka-appender.version>
<kafka-clients.version>3.5.2</kafka-clients.version>
<kafka-clients.version>3.6.1</kafka-clients.version>

<!-- redis -->
<jedis.version>3.8.0</jedis.version>
Expand Down Expand Up @@ -411,8 +412,17 @@
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
</exclusion>
<exclusion>
<groupId>org.codehaus.jettison</groupId>
<artifactId>jettison</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.codehaus.jettison</groupId>
<artifactId>jettison</artifactId>
<version>${jettison.version}</version>
</dependency>
<dependency>
<groupId>com.netflix.archaius</groupId>
<artifactId>archaius-core</artifactId>
Expand Down
5 changes: 5 additions & 0 deletions discovery/seata-discovery-eureka/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@
<groupId>com.thoughtworks.xstream</groupId>
<artifactId>xstream</artifactId>
</dependency>
<dependency>
<groupId>org.codehaus.jettison</groupId>
<artifactId>jettison</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>com.netflix.archaius</groupId>
<artifactId>archaius-core</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ private DubboUtil() {
private static final String ALIBABA_DUBBO_PROXY_NAME_PREFIX = "com.alibaba.dubbo.common.bytecode.proxy";
private static final String APACHE_DUBBO_PROXY_NAME_PREFIX = "org.apache.dubbo.common.bytecode.proxy";

private static final String DUBBO_3_X_PARTIAL_PROXY_NAME = "DubboProxy";

/**
* get the interface class of the dubbo proxy which be generated by javaassist
*
Expand Down Expand Up @@ -69,6 +71,6 @@ public static Class<?> getAssistInterface(Object proxyBean)
}

public static boolean isDubboProxyName(String name) {
return name.startsWith(ALIBABA_DUBBO_PROXY_NAME_PREFIX) || name.startsWith(APACHE_DUBBO_PROXY_NAME_PREFIX);
return name.startsWith(ALIBABA_DUBBO_PROXY_NAME_PREFIX) || name.startsWith(APACHE_DUBBO_PROXY_NAME_PREFIX) || name.contains(DUBBO_3_X_PARTIAL_PROXY_NAME);
}
}

0 comments on commit fda95a9

Please sign in to comment.