Skip to content

Commit

Permalink
[alibaba#1540] 4.5.2版本 事务消息设置延迟时间后导致RMQ_SYS_TRANS_HALF_TOPIC爆掉的问题修复
Browse files Browse the repository at this point in the history
  • Loading branch information
huangying committed Oct 18, 2019
1 parent 034bebc commit 45043e2
Show file tree
Hide file tree
Showing 2 changed files with 100 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1179,6 +1179,12 @@ public TransactionSendResult sendMessageInTransaction(final Message msg,
if (null == localTransactionExecuter && null == transactionListener) {
throw new MQClientException("tranExecutor is null", null);
}

// 事务消息屏蔽DelayTimeLevel参数
if(msg.getDelayTimeLevel() != 0) {
MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_DELAY_TIME_LEVEL);
}

Validators.checkMessage(msg, this.defaultMQProducer);

SendResult sendResult = null;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package org.apache.rocketmq.client.producer;

import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import static org.assertj.core.api.Assertions.assertThat;

/**
* @program: rocketmq
* @description: 事务消息测试类
* @author: TonyStark888
* @create: 2019-10-18 11:22
**/
public class TransactionMQProducerTest {
private TransactionMQProducer producer;
private String producerGroupPrefix = "Transaction_PID";

@Before
public void init() throws Exception {
String producerGroupTemp = producerGroupPrefix + System.currentTimeMillis();
producer = new TransactionMQProducer(producerGroupTemp);
producer.setNamesrvAddr("10.0.133.29:9876");
producer.setTransactionListener(new DelayTimeLevelTransactionListener());
producer.start();
}

@After
public void terminate() {
producer.shutdown();
}

@Test
public void testSendMessage() throws InterruptedException, RemotingException, MQBrokerException {
try {
Message message = new Message("TransactionTopic", "transactionTest", "msg-1", ("HelloTime").getBytes());
SendResult result = producer.sendMessageInTransaction(message, "HelloTime");
System.out.printf("Topic:%s send success, misId is:%s%n", message.getTopic(), result.getMsgId());

// 挂起5分钟,等待事务回查
Thread.sleep(5 * 60 * 1000L);
} catch (MQClientException e) {
assertThat(e).hasMessageContaining("Transaction Message Send Error");
}
}

@Test
public void testSendMessage_DelayTimeLevel() throws RemotingException, InterruptedException, MQBrokerException {
try {
Message message = new Message("TransactionTopic", "transactionTest", "msg-1", ("HelloDelayTime").getBytes());
message.setDelayTimeLevel(1);
SendResult result = producer.sendMessageInTransaction(message, "HelloDelayTime");
System.out.printf("Topic:%s send success, misId is:%s%n", message.getTopic(), result.getMsgId());

// 挂起5分钟,等待事务回查
Thread.sleep(5 * 60 * 1000L);
} catch (MQClientException e) {
assertThat(e).hasMessageContaining("Transaction Message Send Error");
}
}
}

/**
* 简便起见,直接返回Commit
*/
class DelayTimeLevelTransactionListener implements TransactionListener {
/**
* 执行本地事务
*
* @param message
* @param o
* @return
*/
@Override
public LocalTransactionState executeLocalTransaction(Message message, Object o) {
return LocalTransactionState.COMMIT_MESSAGE;
}

/**
* 回查本地事务结果
*
* @param messageExt
* @return
*/
@Override
public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
return LocalTransactionState.COMMIT_MESSAGE;
}
}

0 comments on commit 45043e2

Please sign in to comment.