Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #8460] Improve the pop revive process when reading biz messages from a remote broker - part2 #8494

Merged
merged 6 commits into from
Aug 14, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,11 @@
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.store.GetMessageResult;
import org.apache.rocketmq.store.GetMessageStatus;
import org.apache.rocketmq.store.MessageStore;
import org.apache.rocketmq.store.PutMessageResult;
import org.apache.rocketmq.store.PutMessageStatus;
import org.apache.rocketmq.tieredstore.TieredMessageStore;

public class EscapeBridge {
protected static final Logger LOG = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
Expand Down Expand Up @@ -99,7 +101,7 @@ public PutMessageResult putMessage(MessageExtBrokerInner messageExt) {

try {
messageExt.setWaitStoreMsgOK(false);
final SendResult sendResult = putMessageToRemoteBroker(messageExt);
final SendResult sendResult = putMessageToRemoteBroker(messageExt, null);
return transformSendResult2PutResult(sendResult);
} catch (Exception e) {
LOG.error("sendMessageInFailover to remote failed", e);
Expand All @@ -112,7 +114,10 @@ public PutMessageResult putMessage(MessageExtBrokerInner messageExt) {
}
}

private SendResult putMessageToRemoteBroker(MessageExtBrokerInner messageExt) {
public SendResult putMessageToRemoteBroker(MessageExtBrokerInner messageExt, String brokerNameToSend) {
if (this.brokerController.getBrokerConfig().getBrokerName().equals(brokerNameToSend)) { // not remote broker
return null;
}
final boolean isTransHalfMessage = TransactionalMessageUtil.buildHalfTopic().equals(messageExt.getTopic());
MessageExtBrokerInner messageToPut = messageExt;
if (isTransHalfMessage) {
Expand All @@ -125,12 +130,26 @@ private SendResult putMessageToRemoteBroker(MessageExtBrokerInner messageExt) {
return null;
}

final MessageQueue mqSelected = topicPublishInfo.selectOneMessageQueue(this.brokerController.getBrokerConfig().getBrokerName());

messageToPut.setQueueId(mqSelected.getQueueId());
final MessageQueue mqSelected;
if (StringUtils.isEmpty(brokerNameToSend)) {
mqSelected = topicPublishInfo.selectOneMessageQueue(this.brokerController.getBrokerConfig().getBrokerName());
messageToPut.setQueueId(mqSelected.getQueueId());
brokerNameToSend = mqSelected.getBrokerName();
if (this.brokerController.getBrokerConfig().getBrokerName().equals(brokerNameToSend)) {
LOG.warn("putMessageToRemoteBroker failed, remote broker not found. Topic: {}, MsgId: {}, Broker: {}",
messageExt.getTopic(), messageExt.getMsgId(), brokerNameToSend);
return null;
}
} else {
mqSelected = new MessageQueue(messageExt.getTopic(), brokerNameToSend, messageExt.getQueueId());
}

final String brokerNameToSend = mqSelected.getBrokerName();
final String brokerAddrToSend = this.brokerController.getTopicRouteInfoManager().findBrokerAddressInPublish(brokerNameToSend);
if (null == brokerAddrToSend) {
LOG.warn("putMessageToRemoteBroker failed, remote broker address not found. Topic: {}, MsgId: {}, Broker: {}",
messageExt.getTopic(), messageExt.getMsgId(), brokerNameToSend);
return null;
}

final long beginTimestamp = System.currentTimeMillis();
try {
Expand Down Expand Up @@ -279,8 +298,12 @@ public CompletableFuture<Triple<MessageExt, String, Boolean>> getMessageAsync(St
}
List<MessageExt> list = decodeMsgList(result, deCompressBody);
if (list == null || list.isEmpty()) {
LOG.warn("Can not get msg , topic {}, offset {}, queueId {}, result is {}", topic, offset, queueId, result);
return Triple.of(null, "Can not get msg", false); // local store, so no retry
// OFFSET_FOUND_NULL returned by TieredMessageStore indicates exception occurred
boolean needRetry = GetMessageStatus.OFFSET_FOUND_NULL.equals(result.getStatus())
&& messageStore instanceof TieredMessageStore;
LOG.warn("Can not get msg , topic {}, offset {}, queueId {}, needRetry {}, result is {}",
topic, offset, queueId, needRetry, result);
return Triple.of(null, "Can not get msg", needRetry);
}
return Triple.of(list.get(0), "", false);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,9 +199,8 @@ private boolean reachTail(PullResult pullResult, long offset) {
}

// Triple<MessageExt, info, needRetry>
private CompletableFuture<Triple<MessageExt, String, Boolean>> getBizMessage(String topic, long offset, int queueId,
String brokerName) {
return this.brokerController.getEscapeBridge().getMessageAsync(topic, offset, queueId, brokerName, false);
public CompletableFuture<Triple<MessageExt, String, Boolean>> getBizMessage(PopCheckPoint popCheckPoint, long offset) {
return this.brokerController.getEscapeBridge().getMessageAsync(popCheckPoint.getTopic(), offset, popCheckPoint.getQueueId(), popCheckPoint.getBrokerName(), false);
}

public PullResult getMessage(String group, String topic, int queueId, long offset, int nums,
Expand Down Expand Up @@ -358,7 +357,7 @@ protected void consumeReviveMessage(ConsumeReviveObj consumeReviveObj) {
if (point.getTopic() == null || point.getCId() == null) {
continue;
}
map.put(point.getTopic() + point.getCId() + point.getQueueId() + point.getStartOffset() + point.getPopTime(), point);
map.put(point.getTopic() + point.getCId() + point.getQueueId() + point.getStartOffset() + point.getPopTime() + point.getBrokerName(), point);
PopMetricsManager.incPopReviveCkGetCount(point, queueId);
point.setReviveOffset(messageExt.getQueueOffset());
if (firstRt == 0) {
Expand All @@ -371,7 +370,7 @@ protected void consumeReviveMessage(ConsumeReviveObj consumeReviveObj) {
}
AckMsg ackMsg = JSON.parseObject(raw, AckMsg.class);
PopMetricsManager.incPopReviveAckGetCount(ackMsg, queueId);
String mergeKey = ackMsg.getTopic() + ackMsg.getConsumerGroup() + ackMsg.getQueueId() + ackMsg.getStartOffset() + ackMsg.getPopTime();
String mergeKey = ackMsg.getTopic() + ackMsg.getConsumerGroup() + ackMsg.getQueueId() + ackMsg.getStartOffset() + ackMsg.getPopTime() + ackMsg.getBrokerName();
PopCheckPoint point = map.get(mergeKey);
if (point == null) {
if (!brokerController.getBrokerConfig().isEnableSkipLongAwaitingAck()) {
Expand All @@ -396,7 +395,7 @@ protected void consumeReviveMessage(ConsumeReviveObj consumeReviveObj) {

BatchAckMsg bAckMsg = JSON.parseObject(raw, BatchAckMsg.class);
PopMetricsManager.incPopReviveAckGetCount(bAckMsg, queueId);
String mergeKey = bAckMsg.getTopic() + bAckMsg.getConsumerGroup() + bAckMsg.getQueueId() + bAckMsg.getStartOffset() + bAckMsg.getPopTime();
String mergeKey = bAckMsg.getTopic() + bAckMsg.getConsumerGroup() + bAckMsg.getQueueId() + bAckMsg.getStartOffset() + bAckMsg.getPopTime() + bAckMsg.getBrokerName();
PopCheckPoint point = map.get(mergeKey);
if (point == null) {
if (!brokerController.getBrokerConfig().isEnableSkipLongAwaitingAck()) {
Expand Down Expand Up @@ -528,7 +527,7 @@ private void reviveMsgFromCk(PopCheckPoint popCheckPoint) {

// retry msg
long msgOffset = popCheckPoint.ackOffsetByIndex((byte) j);
CompletableFuture<Pair<Long, Boolean>> future = getBizMessage(popCheckPoint.getTopic(), msgOffset, popCheckPoint.getQueueId(), popCheckPoint.getBrokerName())
CompletableFuture<Pair<Long, Boolean>> future = getBizMessage(popCheckPoint, msgOffset)
.thenApply(rst -> {
MessageExt message = rst.getLeft();
if (message == null) {
Expand Down Expand Up @@ -568,9 +567,9 @@ private void reviveMsgFromCk(PopCheckPoint popCheckPoint) {

private void rePutCK(PopCheckPoint oldCK, Pair<Long, Boolean> pair) {
int rePutTimes = oldCK.parseRePutTimes();
if (rePutTimes >= ckRewriteIntervalsInSeconds.length) {
POP_LOGGER.warn("rePut CK reach max times, drop it. {}, {}, {}, {}-{}, {}, {}", oldCK.getTopic(), oldCK.getCId(),
oldCK.getBrokerName(), oldCK.getQueueId(), pair.getObject1(), oldCK.getPopTime(), oldCK.getInvisibleTime());
if (rePutTimes >= ckRewriteIntervalsInSeconds.length && brokerController.getBrokerConfig().isSkipWhenCKRePutReachMaxTimes()) {
POP_LOGGER.warn("rePut CK reach max times, drop it. {}, {}, {}, {}-{}, {}, {}, {}", oldCK.getTopic(), oldCK.getCId(),
oldCK.getBrokerName(), oldCK.getQueueId(), pair.getObject1(), oldCK.getPopTime(), oldCK.getInvisibleTime(), rePutTimes);
return;
}

Expand All @@ -588,7 +587,8 @@ private void rePutCK(PopCheckPoint oldCK, Pair<Long, Boolean> pair) {
newCk.setRePutTimes(String.valueOf(rePutTimes + 1)); // always increment even if removed from reviveRequestMap
if (oldCK.getReviveTime() <= System.currentTimeMillis()) {
// never expect an ACK matched in the future, we just use it to rewrite CK and try to revive retry message next time
newCk.setInvisibleTime(oldCK.getInvisibleTime() + ckRewriteIntervalsInSeconds[rePutTimes] * 1000);
int intervalIndex = rePutTimes >= ckRewriteIntervalsInSeconds.length ? ckRewriteIntervalsInSeconds.length - 1 : rePutTimes;
newCk.setInvisibleTime(oldCK.getInvisibleTime() + ckRewriteIntervalsInSeconds[intervalIndex] * 1000);
}
MessageExtBrokerInner ckMsg = brokerController.getPopMessageProcessor().buildCkMsg(newCk, queueId);
brokerController.getMessageStore().putMessage(ckMsg);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,19 +30,23 @@
import org.apache.rocketmq.broker.topic.TopicRouteInfoManager;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.consumer.PullStatus;
import org.apache.rocketmq.client.impl.producer.TopicPublishInfo;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageExtBrokerInner;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.store.DefaultMessageStore;
import org.apache.rocketmq.store.GetMessageResult;
import org.apache.rocketmq.store.GetMessageStatus;
import org.apache.rocketmq.store.PutMessageResult;
import org.apache.rocketmq.store.PutMessageStatus;
import org.apache.rocketmq.store.SelectMappedBufferResult;
import org.apache.rocketmq.store.logfile.DefaultMappedFile;
import org.apache.rocketmq.store.logfile.MappedFile;
import org.apache.rocketmq.tieredstore.TieredMessageStore;
import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Assert;
Expand All @@ -58,6 +62,9 @@
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.verify;

@RunWith(MockitoJUnitRunner.class)
public class EscapeBridgeTest {
Expand All @@ -75,6 +82,9 @@ public class EscapeBridgeTest {
@Mock
private DefaultMessageStore defaultMessageStore;

@Mock
private TieredMessageStore tieredMessageStore;

private GetMessageResult getMessageResult;

@Mock
Expand Down Expand Up @@ -200,14 +210,37 @@ public void getMessageAsyncTest_localStore_getMessageAsync_null() {
}

@Test
public void getMessageAsyncTest_localStore_decodeNothing() throws Exception {
public void getMessageAsyncTest_localStore_decodeNothing_DefaultMessageStore() throws Exception {
when(brokerController.getMessageStoreByBrokerName(any())).thenReturn(defaultMessageStore);
when(defaultMessageStore.getMessageAsync(anyString(), anyString(), anyInt(), anyLong(), anyInt(), any()))
.thenReturn(CompletableFuture.completedFuture(mockGetMessageResult(0, TEST_TOPIC, null)));
Triple<MessageExt, String, Boolean> rst = escapeBridge.getMessageAsync(TEST_TOPIC, 0, DEFAULT_QUEUE_ID, BROKER_NAME, false).join();
Assert.assertNull(rst.getLeft());
Assert.assertEquals("Can not get msg", rst.getMiddle());
Assert.assertFalse(rst.getRight()); // no retry
for (GetMessageStatus status : GetMessageStatus.values()) {
GetMessageResult getMessageResult = mockGetMessageResult(0, TEST_TOPIC, null);
getMessageResult.setStatus(status);
when(defaultMessageStore.getMessageAsync(anyString(), anyString(), anyInt(), anyLong(), anyInt(), any()))
.thenReturn(CompletableFuture.completedFuture(getMessageResult));
Triple<MessageExt, String, Boolean> rst = escapeBridge.getMessageAsync(TEST_TOPIC, 0, DEFAULT_QUEUE_ID, BROKER_NAME, false).join();
Assert.assertNull(rst.getLeft());
Assert.assertEquals("Can not get msg", rst.getMiddle());
Assert.assertFalse(rst.getRight()); // DefaultMessageStore, no retry
}
}

@Test
public void getMessageAsyncTest_localStore_decodeNothing_TieredMessageStore() throws Exception {
when(brokerController.getMessageStoreByBrokerName(any())).thenReturn(tieredMessageStore);
for (GetMessageStatus status : GetMessageStatus.values()) {
GetMessageResult getMessageResult = new GetMessageResult();
getMessageResult.setStatus(status);
when(tieredMessageStore.getMessageAsync(anyString(), anyString(), anyInt(), anyLong(), anyInt(), any()))
.thenReturn(CompletableFuture.completedFuture(getMessageResult));
Triple<MessageExt, String, Boolean> rst = escapeBridge.getMessageAsync(TEST_TOPIC, 0, DEFAULT_QUEUE_ID, BROKER_NAME, false).join();
Assert.assertNull(rst.getLeft());
Assert.assertEquals("Can not get msg", rst.getMiddle());
if (GetMessageStatus.OFFSET_FOUND_NULL.equals(status)) {
Assert.assertTrue(rst.getRight()); // TieredMessageStore returns OFFSET_FOUND_NULL, need retry
} else {
Assert.assertFalse(rst.getRight()); // other status, like DefaultMessageStore, no retry
}
}
}

@Test
Expand Down Expand Up @@ -320,6 +353,57 @@ public void decodeMsgListTest_messageNotNull() throws Exception {
Assert.assertTrue(Arrays.equals(msg.getBody(), list.get(0).getBody()));
}

@Test
public void testPutMessageToRemoteBroker_noSpecificBrokerName_hasRemoteBroker() throws Exception {
MessageExtBrokerInner message = new MessageExtBrokerInner();
message.setTopic(TEST_TOPIC);
String anotherBrokerName = "broker_b";
TopicPublishInfo publishInfo = mockTopicPublishInfo(BROKER_NAME, anotherBrokerName);
when(topicRouteInfoManager.tryToFindTopicPublishInfo(anyString())).thenReturn(publishInfo);
when(topicRouteInfoManager.findBrokerAddressInPublish(anotherBrokerName)).thenReturn("127.0.0.1");
escapeBridge.putMessageToRemoteBroker(message, null);
verify(brokerOuterAPI).sendMessageToSpecificBroker(eq("127.0.0.1"), eq(anotherBrokerName), any(MessageExtBrokerInner.class), anyString(), anyLong());
}

@Test
public void testPutMessageToRemoteBroker_noSpecificBrokerName_noRemoteBroker() throws Exception {
MessageExtBrokerInner message = new MessageExtBrokerInner();
message.setTopic(TEST_TOPIC);
TopicPublishInfo publishInfo = mockTopicPublishInfo(BROKER_NAME);
when(topicRouteInfoManager.tryToFindTopicPublishInfo(anyString())).thenReturn(publishInfo);
escapeBridge.putMessageToRemoteBroker(message, null);
verify(topicRouteInfoManager, times(0)).findBrokerAddressInPublish(anyString());
}

@Test
public void testPutMessageToRemoteBroker_specificBrokerName_equals() throws Exception {
escapeBridge.putMessageToRemoteBroker(new MessageExtBrokerInner(), BROKER_NAME);
verify(topicRouteInfoManager, times(0)).tryToFindTopicPublishInfo(anyString());
}

@Test
public void testPutMessageToRemoteBroker_specificBrokerName_addressNotFound() throws Exception {
MessageExtBrokerInner message = new MessageExtBrokerInner();
message.setTopic(TEST_TOPIC);
TopicPublishInfo publishInfo = mockTopicPublishInfo(BROKER_NAME);
when(topicRouteInfoManager.tryToFindTopicPublishInfo(anyString())).thenReturn(publishInfo);
escapeBridge.putMessageToRemoteBroker(message, "whatever");
verify(topicRouteInfoManager).findBrokerAddressInPublish(eq("whatever"));
verify(brokerOuterAPI, times(0)).sendMessageToSpecificBroker(anyString(), anyString(), any(MessageExtBrokerInner.class), anyString(), anyLong());
}

@Test
public void testPutMessageToRemoteBroker_specificBrokerName_addressFound() throws Exception {
MessageExtBrokerInner message = new MessageExtBrokerInner();
message.setTopic(TEST_TOPIC);
String anotherBrokerName = "broker_b";
TopicPublishInfo publishInfo = mockTopicPublishInfo(BROKER_NAME, anotherBrokerName);
when(topicRouteInfoManager.tryToFindTopicPublishInfo(anyString())).thenReturn(publishInfo);
when(topicRouteInfoManager.findBrokerAddressInPublish(anotherBrokerName)).thenReturn("127.0.0.1");
escapeBridge.putMessageToRemoteBroker(message, anotherBrokerName);
verify(brokerOuterAPI).sendMessageToSpecificBroker(eq("127.0.0.1"), eq(anotherBrokerName), any(MessageExtBrokerInner.class), anyString(), anyLong());
}

private GetMessageResult mockGetMessageResult(int count, String topic, byte[] body) throws Exception {
GetMessageResult result = new GetMessageResult();
for (int i = 0; i < count; i++) {
Expand All @@ -337,4 +421,12 @@ private GetMessageResult mockGetMessageResult(int count, String topic, byte[] bo
return result;
}

private TopicPublishInfo mockTopicPublishInfo(String... brokerNames) {
TopicPublishInfo topicPublishInfo = new TopicPublishInfo();
for (String brokerName : brokerNames) {
topicPublishInfo.getMessageQueueList().add(new MessageQueue(TEST_TOPIC, brokerName, 0));
}
return topicPublishInfo;
}

}
Loading
Loading