From 97895ed137793a296e5636a2c00f42a7109919da Mon Sep 17 00:00:00 2001 From: imzs Date: Mon, 5 Aug 2024 19:56:51 +0800 Subject: [PATCH 1/6] [ISSUE #8460] do retry when TieredMessageStore returns GetMessageStatus.OFFSET_FOUND_NULL; support putting message to specific broker --- .../broker/failover/EscapeBridge.java | 39 +++++-- .../broker/failover/EscapeBridgeTest.java | 106 ++++++++++++++++-- 2 files changed, 130 insertions(+), 15 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/failover/EscapeBridge.java b/broker/src/main/java/org/apache/rocketmq/broker/failover/EscapeBridge.java index 7df49f8c470..762d917d640 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/failover/EscapeBridge.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/failover/EscapeBridge.java @@ -48,9 +48,11 @@ import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; import org.apache.rocketmq.remoting.exception.RemotingException; import org.apache.rocketmq.store.GetMessageResult; +import org.apache.rocketmq.store.GetMessageStatus; import org.apache.rocketmq.store.MessageStore; import org.apache.rocketmq.store.PutMessageResult; import org.apache.rocketmq.store.PutMessageStatus; +import org.apache.rocketmq.tieredstore.TieredMessageStore; public class EscapeBridge { protected static final Logger LOG = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); @@ -99,7 +101,7 @@ public PutMessageResult putMessage(MessageExtBrokerInner messageExt) { try { messageExt.setWaitStoreMsgOK(false); - final SendResult sendResult = putMessageToRemoteBroker(messageExt); + final SendResult sendResult = putMessageToRemoteBroker(messageExt, null); return transformSendResult2PutResult(sendResult); } catch (Exception e) { LOG.error("sendMessageInFailover to remote failed", e); @@ -112,7 +114,10 @@ public PutMessageResult putMessage(MessageExtBrokerInner messageExt) { } } - private SendResult putMessageToRemoteBroker(MessageExtBrokerInner messageExt) { + public SendResult putMessageToRemoteBroker(MessageExtBrokerInner messageExt, String brokerNameToSend) { + if (this.brokerController.getBrokerConfig().getBrokerName().equals(brokerNameToSend)) { // not remote broker + return null; + } final boolean isTransHalfMessage = TransactionalMessageUtil.buildHalfTopic().equals(messageExt.getTopic()); MessageExtBrokerInner messageToPut = messageExt; if (isTransHalfMessage) { @@ -125,12 +130,26 @@ private SendResult putMessageToRemoteBroker(MessageExtBrokerInner messageExt) { return null; } - final MessageQueue mqSelected = topicPublishInfo.selectOneMessageQueue(this.brokerController.getBrokerConfig().getBrokerName()); - - messageToPut.setQueueId(mqSelected.getQueueId()); + final MessageQueue mqSelected; + if (StringUtils.isEmpty(brokerNameToSend)) { + mqSelected = topicPublishInfo.selectOneMessageQueue(this.brokerController.getBrokerConfig().getBrokerName()); + messageToPut.setQueueId(mqSelected.getQueueId()); + brokerNameToSend = mqSelected.getBrokerName(); + if (this.brokerController.getBrokerConfig().getBrokerName().equals(brokerNameToSend)) { + LOG.warn("putMessageToRemoteBroker failed, remote broker not found. Topic: {}, MsgId: {}, Broker: {}", + messageExt.getTopic(), messageExt.getMsgId(), brokerNameToSend); + return null; + } + } else { + mqSelected = new MessageQueue(messageExt.getTopic(), brokerNameToSend, messageExt.getQueueId()); + } - final String brokerNameToSend = mqSelected.getBrokerName(); final String brokerAddrToSend = this.brokerController.getTopicRouteInfoManager().findBrokerAddressInPublish(brokerNameToSend); + if (null == brokerAddrToSend) { + LOG.warn("putMessageToRemoteBroker failed, remote broker address not found. Topic: {}, MsgId: {}, Broker: {}", + messageExt.getTopic(), messageExt.getMsgId(), brokerNameToSend); + return null; + } final long beginTimestamp = System.currentTimeMillis(); try { @@ -279,8 +298,12 @@ public CompletableFuture> getMessageAsync(St } List list = decodeMsgList(result, deCompressBody); if (list == null || list.isEmpty()) { - LOG.warn("Can not get msg , topic {}, offset {}, queueId {}, result is {}", topic, offset, queueId, result); - return Triple.of(null, "Can not get msg", false); // local store, so no retry + // OFFSET_FOUND_NULL returned by TieredMessageStore indicates exception occurred + boolean needRetry = GetMessageStatus.OFFSET_FOUND_NULL.equals(result.getStatus()) + && messageStore instanceof TieredMessageStore; + LOG.warn("Can not get msg , topic {}, offset {}, queueId {}, needRetry {}, result is {}", + topic, offset, queueId, needRetry, result); + return Triple.of(null, "Can not get msg", needRetry); } return Triple.of(list.get(0), "", false); }); diff --git a/broker/src/test/java/org/apache/rocketmq/broker/failover/EscapeBridgeTest.java b/broker/src/test/java/org/apache/rocketmq/broker/failover/EscapeBridgeTest.java index 7ea06665c3e..27fc37dbec8 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/failover/EscapeBridgeTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/failover/EscapeBridgeTest.java @@ -30,19 +30,23 @@ import org.apache.rocketmq.broker.topic.TopicRouteInfoManager; import org.apache.rocketmq.client.consumer.PullResult; import org.apache.rocketmq.client.consumer.PullStatus; +import org.apache.rocketmq.client.impl.producer.TopicPublishInfo; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.common.BrokerConfig; import org.apache.rocketmq.common.message.MessageDecoder; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageExtBrokerInner; +import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.remoting.exception.RemotingException; import org.apache.rocketmq.store.DefaultMessageStore; import org.apache.rocketmq.store.GetMessageResult; +import org.apache.rocketmq.store.GetMessageStatus; import org.apache.rocketmq.store.PutMessageResult; import org.apache.rocketmq.store.PutMessageStatus; import org.apache.rocketmq.store.SelectMappedBufferResult; import org.apache.rocketmq.store.logfile.DefaultMappedFile; import org.apache.rocketmq.store.logfile.MappedFile; +import org.apache.rocketmq.tieredstore.TieredMessageStore; import org.assertj.core.api.Assertions; import org.junit.After; import org.junit.Assert; @@ -58,6 +62,9 @@ import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.when; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.eq; +import static org.mockito.Mockito.verify; @RunWith(MockitoJUnitRunner.class) public class EscapeBridgeTest { @@ -75,6 +82,9 @@ public class EscapeBridgeTest { @Mock private DefaultMessageStore defaultMessageStore; + @Mock + private TieredMessageStore tieredMessageStore; + private GetMessageResult getMessageResult; @Mock @@ -200,14 +210,37 @@ public void getMessageAsyncTest_localStore_getMessageAsync_null() { } @Test - public void getMessageAsyncTest_localStore_decodeNothing() throws Exception { + public void getMessageAsyncTest_localStore_decodeNothing_DefaultMessageStore() throws Exception { when(brokerController.getMessageStoreByBrokerName(any())).thenReturn(defaultMessageStore); - when(defaultMessageStore.getMessageAsync(anyString(), anyString(), anyInt(), anyLong(), anyInt(), any())) - .thenReturn(CompletableFuture.completedFuture(mockGetMessageResult(0, TEST_TOPIC, null))); - Triple rst = escapeBridge.getMessageAsync(TEST_TOPIC, 0, DEFAULT_QUEUE_ID, BROKER_NAME, false).join(); - Assert.assertNull(rst.getLeft()); - Assert.assertEquals("Can not get msg", rst.getMiddle()); - Assert.assertFalse(rst.getRight()); // no retry + for (GetMessageStatus status : GetMessageStatus.values()) { + GetMessageResult getMessageResult = mockGetMessageResult(0, TEST_TOPIC, null); + getMessageResult.setStatus(status); + when(defaultMessageStore.getMessageAsync(anyString(), anyString(), anyInt(), anyLong(), anyInt(), any())) + .thenReturn(CompletableFuture.completedFuture(getMessageResult)); + Triple rst = escapeBridge.getMessageAsync(TEST_TOPIC, 0, DEFAULT_QUEUE_ID, BROKER_NAME, false).join(); + Assert.assertNull(rst.getLeft()); + Assert.assertEquals("Can not get msg", rst.getMiddle()); + Assert.assertFalse(rst.getRight()); // DefaultMessageStore, no retry + } + } + + @Test + public void getMessageAsyncTest_localStore_decodeNothing_TieredMessageStore() throws Exception { + when(brokerController.getMessageStoreByBrokerName(any())).thenReturn(tieredMessageStore); + for (GetMessageStatus status : GetMessageStatus.values()) { + GetMessageResult getMessageResult = new GetMessageResult(); + getMessageResult.setStatus(status); + when(tieredMessageStore.getMessageAsync(anyString(), anyString(), anyInt(), anyLong(), anyInt(), any())) + .thenReturn(CompletableFuture.completedFuture(getMessageResult)); + Triple rst = escapeBridge.getMessageAsync(TEST_TOPIC, 0, DEFAULT_QUEUE_ID, BROKER_NAME, false).join(); + Assert.assertNull(rst.getLeft()); + Assert.assertEquals("Can not get msg", rst.getMiddle()); + if (GetMessageStatus.OFFSET_FOUND_NULL.equals(status)) { + Assert.assertTrue(rst.getRight()); // TieredMessageStore returns OFFSET_FOUND_NULL, need retry + } else { + Assert.assertFalse(rst.getRight()); // other status, like DefaultMessageStore, no retry + } + } } @Test @@ -320,6 +353,57 @@ public void decodeMsgListTest_messageNotNull() throws Exception { Assert.assertTrue(Arrays.equals(msg.getBody(), list.get(0).getBody())); } + @Test + public void testPutMessageToRemoteBroker_noSpecificBrokerName_hasRemoteBroker() throws Exception { + MessageExtBrokerInner message = new MessageExtBrokerInner(); + message.setTopic(TEST_TOPIC); + String anotherBrokerName = "broker_b"; + TopicPublishInfo publishInfo = mockTopicPublishInfo(BROKER_NAME, anotherBrokerName); + when(topicRouteInfoManager.tryToFindTopicPublishInfo(anyString())).thenReturn(publishInfo); + when(topicRouteInfoManager.findBrokerAddressInPublish(anotherBrokerName)).thenReturn("127.0.0.1"); + escapeBridge.putMessageToRemoteBroker(message, null); + verify(brokerOuterAPI).sendMessageToSpecificBroker(eq("127.0.0.1"), eq(anotherBrokerName), any(MessageExtBrokerInner.class), anyString(), anyLong()); + } + + @Test + public void testPutMessageToRemoteBroker_noSpecificBrokerName_noRemoteBroker() throws Exception { + MessageExtBrokerInner message = new MessageExtBrokerInner(); + message.setTopic(TEST_TOPIC); + TopicPublishInfo publishInfo = mockTopicPublishInfo(BROKER_NAME); + when(topicRouteInfoManager.tryToFindTopicPublishInfo(anyString())).thenReturn(publishInfo); + escapeBridge.putMessageToRemoteBroker(message, null); + verify(topicRouteInfoManager, times(0)).findBrokerAddressInPublish(anyString()); + } + + @Test + public void testPutMessageToRemoteBroker_specificBrokerName_equals() throws Exception { + escapeBridge.putMessageToRemoteBroker(new MessageExtBrokerInner(), BROKER_NAME); + verify(topicRouteInfoManager, times(0)).tryToFindTopicPublishInfo(anyString()); + } + + @Test + public void testPutMessageToRemoteBroker_specificBrokerName_addressNotFound() throws Exception { + MessageExtBrokerInner message = new MessageExtBrokerInner(); + message.setTopic(TEST_TOPIC); + TopicPublishInfo publishInfo = mockTopicPublishInfo(BROKER_NAME); + when(topicRouteInfoManager.tryToFindTopicPublishInfo(anyString())).thenReturn(publishInfo); + escapeBridge.putMessageToRemoteBroker(message, "whatever"); + verify(topicRouteInfoManager).findBrokerAddressInPublish(eq("whatever")); + verify(brokerOuterAPI, times(0)).sendMessageToSpecificBroker(anyString(), anyString(), any(MessageExtBrokerInner.class), anyString(), anyLong()); + } + + @Test + public void testPutMessageToRemoteBroker_specificBrokerName_addressFound() throws Exception { + MessageExtBrokerInner message = new MessageExtBrokerInner(); + message.setTopic(TEST_TOPIC); + String anotherBrokerName = "broker_b"; + TopicPublishInfo publishInfo = mockTopicPublishInfo(BROKER_NAME, anotherBrokerName); + when(topicRouteInfoManager.tryToFindTopicPublishInfo(anyString())).thenReturn(publishInfo); + when(topicRouteInfoManager.findBrokerAddressInPublish(anotherBrokerName)).thenReturn("127.0.0.1"); + escapeBridge.putMessageToRemoteBroker(message, anotherBrokerName); + verify(brokerOuterAPI).sendMessageToSpecificBroker(eq("127.0.0.1"), eq(anotherBrokerName), any(MessageExtBrokerInner.class), anyString(), anyLong()); + } + private GetMessageResult mockGetMessageResult(int count, String topic, byte[] body) throws Exception { GetMessageResult result = new GetMessageResult(); for (int i = 0; i < count; i++) { @@ -337,4 +421,12 @@ private GetMessageResult mockGetMessageResult(int count, String topic, byte[] bo return result; } + private TopicPublishInfo mockTopicPublishInfo(String... brokerNames) { + TopicPublishInfo topicPublishInfo = new TopicPublishInfo(); + for (String brokerName : brokerNames) { + topicPublishInfo.getMessageQueueList().add(new MessageQueue(TEST_TOPIC, brokerName, 0)); + } + return topicPublishInfo; + } + } From faff309856490d99f2c14f4ddb8aef52309e7103 Mon Sep 17 00:00:00 2001 From: imzs Date: Mon, 5 Aug 2024 21:39:47 +0800 Subject: [PATCH 2/6] [ISSUE #8460] add a switch [skipWhenCKRePutReachMaxTimes] whether to continue to rewrite CK after max times --- .../broker/processor/PopReviveService.java | 9 ++-- .../processor/PopReviveServiceTest.java | 44 ++++++++++++++++++- .../apache/rocketmq/common/BrokerConfig.java | 11 +++++ 3 files changed, 59 insertions(+), 5 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java index 114d094600e..6088fd8da39 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java @@ -568,9 +568,9 @@ private void reviveMsgFromCk(PopCheckPoint popCheckPoint) { private void rePutCK(PopCheckPoint oldCK, Pair pair) { int rePutTimes = oldCK.parseRePutTimes(); - if (rePutTimes >= ckRewriteIntervalsInSeconds.length) { - POP_LOGGER.warn("rePut CK reach max times, drop it. {}, {}, {}, {}-{}, {}, {}", oldCK.getTopic(), oldCK.getCId(), - oldCK.getBrokerName(), oldCK.getQueueId(), pair.getObject1(), oldCK.getPopTime(), oldCK.getInvisibleTime()); + if (rePutTimes >= ckRewriteIntervalsInSeconds.length && brokerController.getBrokerConfig().isSkipWhenCKRePutReachMaxTimes()) { + POP_LOGGER.warn("rePut CK reach max times, drop it. {}, {}, {}, {}-{}, {}, {}, {}", oldCK.getTopic(), oldCK.getCId(), + oldCK.getBrokerName(), oldCK.getQueueId(), pair.getObject1(), oldCK.getPopTime(), oldCK.getInvisibleTime(), rePutTimes); return; } @@ -588,7 +588,8 @@ private void rePutCK(PopCheckPoint oldCK, Pair pair) { newCk.setRePutTimes(String.valueOf(rePutTimes + 1)); // always increment even if removed from reviveRequestMap if (oldCK.getReviveTime() <= System.currentTimeMillis()) { // never expect an ACK matched in the future, we just use it to rewrite CK and try to revive retry message next time - newCk.setInvisibleTime(oldCK.getInvisibleTime() + ckRewriteIntervalsInSeconds[rePutTimes] * 1000); + int intervalIndex = rePutTimes >= ckRewriteIntervalsInSeconds.length ? ckRewriteIntervalsInSeconds.length - 1 : rePutTimes; + newCk.setInvisibleTime(oldCK.getInvisibleTime() + ckRewriteIntervalsInSeconds[intervalIndex] * 1000); } MessageExtBrokerInner ckMsg = brokerController.getPopMessageProcessor().buildCkMsg(newCk, queueId); brokerController.getMessageStore().putMessage(ckMsg); diff --git a/broker/src/test/java/org/apache/rocketmq/broker/processor/PopReviveServiceTest.java b/broker/src/test/java/org/apache/rocketmq/broker/processor/PopReviveServiceTest.java index d7ea97c5502..603c96bbf91 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/processor/PopReviveServiceTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/processor/PopReviveServiceTest.java @@ -104,7 +104,6 @@ public void before() { brokerConfig = new BrokerConfig(); brokerConfig.setBrokerClusterName(CLUSTER_NAME); when(brokerController.getBrokerConfig()).thenReturn(brokerConfig); - when(brokerController.getBrokerConfig()).thenReturn(brokerConfig); when(brokerController.getConsumerOffsetManager()).thenReturn(consumerOffsetManager); when(brokerController.getMessageStore()).thenReturn(messageStore); when(brokerController.getTopicConfigManager()).thenReturn(topicConfigManager); @@ -285,6 +284,7 @@ public void testReviveMsgFromCk_messageFound_writeRetryFailed_rewriteCK() throws @Test public void testReviveMsgFromCk_messageFound_writeRetryFailed_rewriteCK_end() throws Throwable { + brokerConfig.setSkipWhenCKRePutReachMaxTimes(true); PopCheckPoint ck = buildPopCheckPoint(0, 0, 0); ck.setRePutTimes("17"); PopReviveService.ConsumeReviveObj reviveObj = new PopReviveService.ConsumeReviveObj(); @@ -306,6 +306,30 @@ public void testReviveMsgFromCk_messageFound_writeRetryFailed_rewriteCK_end() th verify(messageStore, times(0)).putMessage(any(MessageExtBrokerInner.class)); // rewrite CK } + @Test + public void testReviveMsgFromCk_messageFound_writeRetryFailed_rewriteCK_noEnd() throws Throwable { + brokerConfig.setSkipWhenCKRePutReachMaxTimes(false); + PopCheckPoint ck = buildPopCheckPoint(0, 0, 0); + ck.setRePutTimes(Byte.MAX_VALUE + ""); + PopReviveService.ConsumeReviveObj reviveObj = new PopReviveService.ConsumeReviveObj(); + reviveObj.map.put("", ck); + reviveObj.endTime = System.currentTimeMillis(); + StringBuilder actualRetryTopic = new StringBuilder(); + + when(escapeBridge.getMessageAsync(anyString(), anyLong(), anyInt(), anyString(), anyBoolean())) + .thenReturn(CompletableFuture.completedFuture(Triple.of(new MessageExt(), "", false))); + when(escapeBridge.putMessageToSpecificQueue(any(MessageExtBrokerInner.class))).thenAnswer(invocation -> { + MessageExtBrokerInner msg = invocation.getArgument(0); + actualRetryTopic.append(msg.getTopic()); + return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, new AppendMessageResult(AppendMessageStatus.MESSAGE_SIZE_EXCEEDED)); + }); + + popReviveService.mergeAndRevive(reviveObj); + Assert.assertEquals(KeyBuilder.buildPopRetryTopic(TOPIC, GROUP, false), actualRetryTopic.toString()); + verify(escapeBridge, times(1)).putMessageToSpecificQueue(any(MessageExtBrokerInner.class)); // write retry + verify(messageStore, times(1)).putMessage(any(MessageExtBrokerInner.class)); // rewrite CK + } + @Test public void testReviveMsgFromCk_messageNotFound_noRetry() throws Throwable { PopCheckPoint ck = buildPopCheckPoint(0, 0, 0); @@ -349,6 +373,7 @@ public void testReviveMsgFromCk_messageNotFound_needRetry() throws Throwable { @Test public void testReviveMsgFromCk_messageNotFound_needRetry_end() throws Throwable { + brokerConfig.setSkipWhenCKRePutReachMaxTimes(true); PopCheckPoint ck = buildPopCheckPoint(0, 0, 0); ck.setRePutTimes("17"); PopReviveService.ConsumeReviveObj reviveObj = new PopReviveService.ConsumeReviveObj(); @@ -363,6 +388,23 @@ public void testReviveMsgFromCk_messageNotFound_needRetry_end() throws Throwable verify(messageStore, times(0)).putMessage(any(MessageExtBrokerInner.class)); // rewrite CK } + @Test + public void testReviveMsgFromCk_messageNotFound_needRetry_noEnd() throws Throwable { + brokerConfig.setSkipWhenCKRePutReachMaxTimes(false); + PopCheckPoint ck = buildPopCheckPoint(0, 0, 0); + ck.setRePutTimes(Byte.MAX_VALUE + ""); + PopReviveService.ConsumeReviveObj reviveObj = new PopReviveService.ConsumeReviveObj(); + reviveObj.map.put("", ck); + reviveObj.endTime = System.currentTimeMillis(); + + when(escapeBridge.getMessageAsync(anyString(), anyLong(), anyInt(), anyString(), anyBoolean())) + .thenReturn(CompletableFuture.completedFuture(Triple.of(null, "", true))); + + popReviveService.mergeAndRevive(reviveObj); + verify(escapeBridge, times(0)).putMessageToSpecificQueue(any(MessageExtBrokerInner.class)); // write retry + verify(messageStore, times(1)).putMessage(any(MessageExtBrokerInner.class)); // rewrite CK + } + public static PopCheckPoint buildPopCheckPoint(long startOffset, long popTime, long reviveOffset) { PopCheckPoint ck = new PopCheckPoint(); ck.setStartOffset(startOffset); diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java index 8982e59d03e..10bf7f76e86 100644 --- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java +++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java @@ -419,6 +419,9 @@ public class BrokerConfig extends BrokerIdentity { */ private String configBlackList = "configBlackList;brokerConfigPath"; + // if false, will still rewrite ck after max times 17 + private boolean skipWhenCKRePutReachMaxTimes = false; + public String getConfigBlackList() { return configBlackList; } @@ -1826,4 +1829,12 @@ public boolean isEnablePopMessageThreshold() { public void setEnablePopMessageThreshold(boolean enablePopMessageThreshold) { this.enablePopMessageThreshold = enablePopMessageThreshold; } + + public boolean isSkipWhenCKRePutReachMaxTimes() { + return skipWhenCKRePutReachMaxTimes; + } + + public void setSkipWhenCKRePutReachMaxTimes(boolean skipWhenCKRePutReachMaxTimes) { + this.skipWhenCKRePutReachMaxTimes = skipWhenCKRePutReachMaxTimes; + } } From 963b918fe6c5df25a5884ed1f0d12999ac00f1fa Mon Sep 17 00:00:00 2001 From: imzs Date: Thu, 8 Aug 2024 00:02:28 +0800 Subject: [PATCH 3/6] [ISSUE #8460] make getBizMessage() public for test purpose --- .../org/apache/rocketmq/broker/processor/PopReviveService.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java index 6088fd8da39..0d4377c2632 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java @@ -199,7 +199,7 @@ private boolean reachTail(PullResult pullResult, long offset) { } // Triple - private CompletableFuture> getBizMessage(String topic, long offset, int queueId, + public CompletableFuture> getBizMessage(String topic, long offset, int queueId, String brokerName) { return this.brokerController.getEscapeBridge().getMessageAsync(topic, offset, queueId, brokerName, false); } From d7ce71efbfa5a6da2ab3d9b8a9809bd18dd99196 Mon Sep 17 00:00:00 2001 From: imzs Date: Thu, 8 Aug 2024 10:23:43 +0800 Subject: [PATCH 4/6] [ISSUE #8460] corner case, add brokerName to mergeKey, to avoid conflict of mergeKey when reviving for another broker --- .../apache/rocketmq/broker/processor/PopReviveService.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java index 0d4377c2632..cf26f00e5fd 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java @@ -358,7 +358,7 @@ protected void consumeReviveMessage(ConsumeReviveObj consumeReviveObj) { if (point.getTopic() == null || point.getCId() == null) { continue; } - map.put(point.getTopic() + point.getCId() + point.getQueueId() + point.getStartOffset() + point.getPopTime(), point); + map.put(point.getTopic() + point.getCId() + point.getQueueId() + point.getStartOffset() + point.getPopTime() + point.getBrokerName(), point); PopMetricsManager.incPopReviveCkGetCount(point, queueId); point.setReviveOffset(messageExt.getQueueOffset()); if (firstRt == 0) { @@ -371,7 +371,7 @@ protected void consumeReviveMessage(ConsumeReviveObj consumeReviveObj) { } AckMsg ackMsg = JSON.parseObject(raw, AckMsg.class); PopMetricsManager.incPopReviveAckGetCount(ackMsg, queueId); - String mergeKey = ackMsg.getTopic() + ackMsg.getConsumerGroup() + ackMsg.getQueueId() + ackMsg.getStartOffset() + ackMsg.getPopTime(); + String mergeKey = ackMsg.getTopic() + ackMsg.getConsumerGroup() + ackMsg.getQueueId() + ackMsg.getStartOffset() + ackMsg.getPopTime() + ackMsg.getBrokerName(); PopCheckPoint point = map.get(mergeKey); if (point == null) { if (!brokerController.getBrokerConfig().isEnableSkipLongAwaitingAck()) { @@ -396,7 +396,7 @@ protected void consumeReviveMessage(ConsumeReviveObj consumeReviveObj) { BatchAckMsg bAckMsg = JSON.parseObject(raw, BatchAckMsg.class); PopMetricsManager.incPopReviveAckGetCount(bAckMsg, queueId); - String mergeKey = bAckMsg.getTopic() + bAckMsg.getConsumerGroup() + bAckMsg.getQueueId() + bAckMsg.getStartOffset() + bAckMsg.getPopTime(); + String mergeKey = bAckMsg.getTopic() + bAckMsg.getConsumerGroup() + bAckMsg.getQueueId() + bAckMsg.getStartOffset() + bAckMsg.getPopTime() + bAckMsg.getBrokerName(); PopCheckPoint point = map.get(mergeKey); if (point == null) { if (!brokerController.getBrokerConfig().isEnableSkipLongAwaitingAck()) { From f31d5a6ee43ed57577d38694aaf5212ad7ba0d97 Mon Sep 17 00:00:00 2001 From: imzs Date: Thu, 8 Aug 2024 21:45:52 +0800 Subject: [PATCH 5/6] [ISSUE #8460] simplify getBizMessage() params, use PopCheckPoint to replace topic, queueId, brokerName --- .../apache/rocketmq/broker/processor/PopReviveService.java | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java index cf26f00e5fd..4b141d29102 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java @@ -199,9 +199,8 @@ private boolean reachTail(PullResult pullResult, long offset) { } // Triple - public CompletableFuture> getBizMessage(String topic, long offset, int queueId, - String brokerName) { - return this.brokerController.getEscapeBridge().getMessageAsync(topic, offset, queueId, brokerName, false); + public CompletableFuture> getBizMessage(PopCheckPoint popCheckPoint, long offset) { + return this.brokerController.getEscapeBridge().getMessageAsync(popCheckPoint.getTopic(), offset, popCheckPoint.getQueueId(), popCheckPoint.getBrokerName(), false); } public PullResult getMessage(String group, String topic, int queueId, long offset, int nums, @@ -528,7 +527,7 @@ private void reviveMsgFromCk(PopCheckPoint popCheckPoint) { // retry msg long msgOffset = popCheckPoint.ackOffsetByIndex((byte) j); - CompletableFuture> future = getBizMessage(popCheckPoint.getTopic(), msgOffset, popCheckPoint.getQueueId(), popCheckPoint.getBrokerName()) + CompletableFuture> future = getBizMessage(popCheckPoint, msgOffset) .thenApply(rst -> { MessageExt message = rst.getLeft(); if (message == null) { From e9445a09552a54e329148824f443b43b989a8d31 Mon Sep 17 00:00:00 2001 From: imzs Date: Fri, 9 Aug 2024 10:49:37 +0800 Subject: [PATCH 6/6] [ISSUE #8460] fix PopReviveServiceTest unit test failure due to brokerName --- .../apache/rocketmq/broker/processor/PopReviveServiceTest.java | 1 + 1 file changed, 1 insertion(+) diff --git a/broker/src/test/java/org/apache/rocketmq/broker/processor/PopReviveServiceTest.java b/broker/src/test/java/org/apache/rocketmq/broker/processor/PopReviveServiceTest.java index 603c96bbf91..3010e836101 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/processor/PopReviveServiceTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/processor/PopReviveServiceTest.java @@ -428,6 +428,7 @@ public static AckMsg buildAckMsg(long offset, long popTime) { ackMsg.setTopic(TOPIC); ackMsg.setQueueId(0); ackMsg.setPopTime(popTime); + ackMsg.setBrokerName("broker-a"); return ackMsg; }