From 25495421c3846915a28c17ce30806e0a5c64e8b9 Mon Sep 17 00:00:00 2001 From: "kaiyi.lk" Date: Thu, 4 May 2023 18:09:39 +0800 Subject: [PATCH] [ISSUE #6691] Support reentrant pop orderly for broker --- .../offset/ConsumerOrderInfoManager.java | 37 +++++++++--- .../broker/processor/AckMessageProcessor.java | 2 +- .../processor/NotificationProcessor.java | 2 +- .../broker/processor/PopMessageProcessor.java | 14 ++--- ...merOrderInfoManagerLockFreeNotifyTest.java | 5 ++ .../offset/ConsumerOrderInfoManagerTest.java | 57 +++++++++++++++---- .../header/PopMessageRequestHeader.java | 11 ++++ .../test/client/rmq/RMQPopClient.java | 7 +++ .../client/consumer/pop/BasePopOrderly.java | 19 ++++++- .../client/consumer/pop/PopOrderlyIT.java | 38 +++++++++++++ .../test/offset/OffsetResetForPopIT.java | 8 +-- 11 files changed, 167 insertions(+), 33 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManager.java b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManager.java index 29bbe99701f..2e2850dbbc4 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManager.java @@ -91,7 +91,7 @@ private void updateLockFreeTimestamp(String topic, String group, int queueId, Or * @param msgQueueOffsetList the queue offsets of messages * @param orderInfoBuilder will append order info to this builder */ - public void update(boolean isRetry, String topic, String group, int queueId, long popTime, long invisibleTime, + public void update(String attemptId, boolean isRetry, String topic, String group, int queueId, long popTime, long invisibleTime, List msgQueueOffsetList, StringBuilder orderInfoBuilder) { String key = buildKey(topic, group); ConcurrentHashMap qs = table.get(key); @@ -106,12 +106,12 @@ public void update(boolean isRetry, String topic, String group, int queueId, lon OrderInfo orderInfo = qs.get(queueId); if (orderInfo != null) { - OrderInfo newOrderInfo = new OrderInfo(popTime, invisibleTime, msgQueueOffsetList, System.currentTimeMillis(), 0); - newOrderInfo.mergeOffsetConsumedCount(orderInfo.offsetList, orderInfo.offsetConsumedCount); + OrderInfo newOrderInfo = new OrderInfo(attemptId, popTime, invisibleTime, msgQueueOffsetList, System.currentTimeMillis(), 0); + newOrderInfo.mergeOffsetConsumedCount(orderInfo.attemptId, orderInfo.offsetList, orderInfo.offsetConsumedCount); orderInfo = newOrderInfo; } else { - orderInfo = new OrderInfo(popTime, invisibleTime, msgQueueOffsetList, System.currentTimeMillis(), 0); + orderInfo = new OrderInfo(attemptId, popTime, invisibleTime, msgQueueOffsetList, System.currentTimeMillis(), 0); } qs.put(queueId, orderInfo); @@ -140,7 +140,7 @@ public void update(boolean isRetry, String topic, String group, int queueId, lon updateLockFreeTimestamp(topic, group, queueId, orderInfo); } - public boolean checkBlock(String topic, String group, int queueId, long invisibleTime) { + public boolean checkBlock(String attemptId, String topic, String group, int queueId, long invisibleTime) { String key = buildKey(topic, group); ConcurrentHashMap qs = table.get(key); if (qs == null) { @@ -156,7 +156,7 @@ public boolean checkBlock(String topic, String group, int queueId, long invisibl if (orderInfo == null) { return false; } - return orderInfo.needBlock(invisibleTime); + return orderInfo.needBlock(attemptId, invisibleTime); } public void clearBlock(String topic, String group, int queueId) { @@ -391,17 +391,20 @@ public static class OrderInfo { */ @JSONField(name = "cm") private long commitOffsetBit; + @JSONField(name = "a") + private String attemptId; public OrderInfo() { } - public OrderInfo(long popTime, long invisibleTime, List queueOffsetList, long lastConsumeTimestamp, + public OrderInfo(String attemptId, long popTime, long invisibleTime, List queueOffsetList, long lastConsumeTimestamp, long commitOffsetBit) { this.popTime = popTime; this.invisibleTime = invisibleTime; this.offsetList = buildOffsetList(queueOffsetList); this.lastConsumeTimestamp = lastConsumeTimestamp; this.commitOffsetBit = commitOffsetBit; + this.attemptId = attemptId; } public List getOffsetList() { @@ -460,6 +463,14 @@ public void setOffsetConsumedCount(Map offsetConsumedCount) { this.offsetConsumedCount = offsetConsumedCount; } + public String getAttemptId() { + return attemptId; + } + + public void setAttemptId(String attemptId) { + this.attemptId = attemptId; + } + public static List buildOffsetList(List queueOffsetList) { List simple = new ArrayList<>(); if (queueOffsetList.size() == 1) { @@ -475,10 +486,13 @@ public static List buildOffsetList(List queueOffsetList) { } @JSONField(serialize = false, deserialize = false) - public boolean needBlock(long currentInvisibleTime) { + public boolean needBlock(String attemptId, long currentInvisibleTime) { if (offsetList == null || offsetList.isEmpty()) { return false; } + if (this.attemptId != null && this.attemptId.equals(attemptId)) { + return false; + } int num = offsetList.size(); int i = 0; if (this.invisibleTime == null || this.invisibleTime <= 0) { @@ -586,11 +600,15 @@ public boolean isNotAck(int offsetIndex) { * @param prevOffsetConsumedCount the offset list of message */ @JSONField(serialize = false, deserialize = false) - public void mergeOffsetConsumedCount(List preOffsetList, Map prevOffsetConsumedCount) { + public void mergeOffsetConsumedCount(String preAttemptId, List preOffsetList, Map prevOffsetConsumedCount) { Map offsetConsumedCount = new HashMap<>(); if (prevOffsetConsumedCount == null) { prevOffsetConsumedCount = new HashMap<>(); } + if (preAttemptId != null && preAttemptId.equals(this.attemptId)) { + this.offsetConsumedCount = prevOffsetConsumedCount; + return; + } Set preQueueOffsetSet = new HashSet<>(); for (int i = 0; i < preOffsetList.size(); i++) { preQueueOffsetSet.add(getQueueOffset(preOffsetList, i)); @@ -619,6 +637,7 @@ public String toString() { .add("offsetConsumedCount", offsetConsumedCount) .add("lastConsumeTimestamp", lastConsumeTimestamp) .add("commitOffsetBit", commitOffsetBit) + .add("attemptId", attemptId) .toString(); } } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java index 824ba48fc65..fa1c0793e42 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java @@ -178,7 +178,7 @@ private RemotingCommand processRequest(final Channel channel, RemotingCommand re this.brokerController.getConsumerOffsetManager().commitOffset(channel.remoteAddress().toString(), requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId(), nextOffset); } - if (!this.brokerController.getConsumerOrderInfoManager().checkBlock(requestHeader.getTopic(), + if (!this.brokerController.getConsumerOrderInfoManager().checkBlock(null, requestHeader.getTopic(), requestHeader.getConsumerGroup(), requestHeader.getQueueId(), invisibleTime)) { this.brokerController.getPopMessageProcessor().notifyMessageArriving( requestHeader.getTopic(), requestHeader.getConsumerGroup(), requestHeader.getQueueId()); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java index 3b306ca2d16..4be77468f12 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java @@ -264,7 +264,7 @@ private RemotingCommand processRequest(final Channel channel, RemotingCommand re } private boolean hasMsgFromQueue(boolean isRetry, NotificationRequestHeader requestHeader, int queueId) { - if (this.brokerController.getConsumerOrderInfoManager().checkBlock(requestHeader.getTopic(), requestHeader.getConsumerGroup(), queueId, 0)) { + if (this.brokerController.getConsumerOrderInfoManager().checkBlock(null, requestHeader.getTopic(), requestHeader.getConsumerGroup(), queueId, 0)) { return false; } String topic = isRetry ? KeyBuilder.buildPopRetryTopic(requestHeader.getTopic(), requestHeader.getConsumerGroup()) : requestHeader.getTopic(); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java index d63fbe62129..8e51b00dd07 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java @@ -416,7 +416,7 @@ private RemotingCommand processRequest(final Channel channel, RemotingCommand re if (retryTopicConfig != null) { for (int i = 0; i < retryTopicConfig.getReadQueueNums(); i++) { int queueId = (randomQ + i) % retryTopicConfig.getReadQueueNums(); - getMessageFuture = getMessageFuture.thenCompose(restNum -> popMsgFromQueue(true, getMessageResult, requestHeader, queueId, restNum, reviveQid, channel, popTime, finalMessageFilter, + getMessageFuture = getMessageFuture.thenCompose(restNum -> popMsgFromQueue(requestHeader.getAttemptId(), true, getMessageResult, requestHeader, queueId, restNum, reviveQid, channel, popTime, finalMessageFilter, startOffsetInfo, msgOffsetInfo, finalOrderCountInfo)); } } @@ -425,12 +425,12 @@ private RemotingCommand processRequest(final Channel channel, RemotingCommand re // read all queue for (int i = 0; i < topicConfig.getReadQueueNums(); i++) { int queueId = (randomQ + i) % topicConfig.getReadQueueNums(); - getMessageFuture = getMessageFuture.thenCompose(restNum -> popMsgFromQueue(false, getMessageResult, requestHeader, queueId, restNum, reviveQid, channel, popTime, finalMessageFilter, + getMessageFuture = getMessageFuture.thenCompose(restNum -> popMsgFromQueue(requestHeader.getAttemptId(), false, getMessageResult, requestHeader, queueId, restNum, reviveQid, channel, popTime, finalMessageFilter, startOffsetInfo, msgOffsetInfo, finalOrderCountInfo)); } } else { int queueId = requestHeader.getQueueId(); - getMessageFuture = getMessageFuture.thenCompose(restNum -> popMsgFromQueue(false, getMessageResult, requestHeader, queueId, restNum, reviveQid, channel, popTime, finalMessageFilter, + getMessageFuture = getMessageFuture.thenCompose(restNum -> popMsgFromQueue(requestHeader.getAttemptId(), false, getMessageResult, requestHeader, queueId, restNum, reviveQid, channel, popTime, finalMessageFilter, startOffsetInfo, msgOffsetInfo, finalOrderCountInfo)); } // if not full , fetch retry again @@ -440,7 +440,7 @@ private RemotingCommand processRequest(final Channel channel, RemotingCommand re if (retryTopicConfig != null) { for (int i = 0; i < retryTopicConfig.getReadQueueNums(); i++) { int queueId = (randomQ + i) % retryTopicConfig.getReadQueueNums(); - getMessageFuture = getMessageFuture.thenCompose(restNum -> popMsgFromQueue(true, getMessageResult, requestHeader, queueId, restNum, reviveQid, channel, popTime, finalMessageFilter, + getMessageFuture = getMessageFuture.thenCompose(restNum -> popMsgFromQueue(requestHeader.getAttemptId(), true, getMessageResult, requestHeader, queueId, restNum, reviveQid, channel, popTime, finalMessageFilter, startOffsetInfo, msgOffsetInfo, finalOrderCountInfo)); } } @@ -523,7 +523,7 @@ private RemotingCommand processRequest(final Channel channel, RemotingCommand re return null; } - private CompletableFuture popMsgFromQueue(boolean isRetry, GetMessageResult getMessageResult, + private CompletableFuture popMsgFromQueue(String attemptId, boolean isRetry, GetMessageResult getMessageResult, PopMessageRequestHeader requestHeader, int queueId, long restNum, int reviveQid, Channel channel, long popTime, ExpressionMessageFilter messageFilter, StringBuilder startOffsetInfo, StringBuilder msgOffsetInfo, StringBuilder orderCountInfo) { @@ -545,7 +545,7 @@ private CompletableFuture popMsgFromQueue(boolean isRetry, GetMessageResul future.whenComplete((result, throwable) -> queueLockManager.unLock(lockKey)); offset = getPopOffset(topic, requestHeader.getConsumerGroup(), queueId, requestHeader.getInitMode(), true, lockKey, true); - if (isOrder && brokerController.getConsumerOrderInfoManager().checkBlock(topic, + if (isOrder && brokerController.getConsumerOrderInfoManager().checkBlock(attemptId, topic, requestHeader.getConsumerGroup(), queueId, requestHeader.getInvisibleTime())) { future.complete(this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId) - offset + restNum); return future; @@ -618,7 +618,7 @@ private CompletableFuture popMsgFromQueue(boolean isRetry, GetMessageResul BrokerMetricsManager.throughputOutTotal.add(result.getBufferTotalSize(), attributes); if (isOrder) { - this.brokerController.getConsumerOrderInfoManager().update(isRetry, topic, + this.brokerController.getConsumerOrderInfoManager().update(requestHeader.getAttemptId(), isRetry, topic, requestHeader.getConsumerGroup(), queueId, popTime, requestHeader.getInvisibleTime(), result.getMessageQueueOffset(), orderCountInfo); diff --git a/broker/src/test/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManagerLockFreeNotifyTest.java b/broker/src/test/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManagerLockFreeNotifyTest.java index e5033a05d96..93689efa586 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManagerLockFreeNotifyTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManagerLockFreeNotifyTest.java @@ -67,6 +67,7 @@ public void before() { @Test public void testConsumeMessageThenNoAck() { consumerOrderInfoManager.update( + null, false, TOPIC, GROUP, @@ -83,6 +84,7 @@ public void testConsumeMessageThenNoAck() { @Test public void testConsumeMessageThenAck() { consumerOrderInfoManager.update( + null, false, TOPIC, GROUP, @@ -106,6 +108,7 @@ public void testConsumeMessageThenAck() { @Test public void testConsumeTheChangeInvisibleLonger() { consumerOrderInfoManager.update( + null, false, TOPIC, GROUP, @@ -130,6 +133,7 @@ public void testConsumeTheChangeInvisibleLonger() { @Test public void testConsumeTheChangeInvisibleShorter() { consumerOrderInfoManager.update( + null, false, TOPIC, GROUP, @@ -155,6 +159,7 @@ public void testConsumeTheChangeInvisibleShorter() { public void testRecover() { ConsumerOrderInfoManager savedConsumerOrderInfoManager = new ConsumerOrderInfoManager(); savedConsumerOrderInfoManager.update( + null, false, TOPIC, GROUP, diff --git a/broker/src/test/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManagerTest.java b/broker/src/test/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManagerTest.java index f260632c664..25b418c9344 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManagerTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManagerTest.java @@ -19,6 +19,7 @@ import java.time.Duration; import java.util.Map; +import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; @@ -63,6 +64,7 @@ public void before() { @Test public void testCommitAndNext() { consumerOrderInfoManager.update( + null, false, TOPIC, GROUP, @@ -82,6 +84,7 @@ public void testCommitAndNext() { )); assertEncodeAndDecode(); assertTrue(consumerOrderInfoManager.checkBlock( + null, TOPIC, GROUP, QUEUE_ID_0, @@ -97,6 +100,7 @@ public void testCommitAndNext() { )); assertEncodeAndDecode(); assertFalse(consumerOrderInfoManager.checkBlock( + null, TOPIC, GROUP, QUEUE_ID_0, @@ -110,6 +114,7 @@ public void testConsumedCount() { // consume three new messages StringBuilder orderInfoBuilder = new StringBuilder(); consumerOrderInfoManager.update( + null, false, TOPIC, GROUP, @@ -129,6 +134,7 @@ public void testConsumedCount() { // reconsume same messages StringBuilder orderInfoBuilder = new StringBuilder(); consumerOrderInfoManager.update( + null, false, TOPIC, GROUP, @@ -151,6 +157,7 @@ public void testConsumedCount() { // reconsume last two message StringBuilder orderInfoBuilder = new StringBuilder(); consumerOrderInfoManager.update( + null, false, TOPIC, GROUP, @@ -173,6 +180,7 @@ public void testConsumedCount() { // consume a new message and reconsume last message StringBuilder orderInfoBuilder = new StringBuilder(); consumerOrderInfoManager.update( + null, false, TOPIC, GROUP, @@ -193,6 +201,7 @@ public void testConsumedCount() { // consume two new messages StringBuilder orderInfoBuilder = new StringBuilder(); consumerOrderInfoManager.update( + null, false, TOPIC, GROUP, @@ -215,6 +224,7 @@ public void testConsumedCountForMultiQueue() { // consume two new messages StringBuilder orderInfoBuilder = new StringBuilder(); consumerOrderInfoManager.update( + null, false, TOPIC, GROUP, @@ -225,6 +235,7 @@ public void testConsumedCountForMultiQueue() { orderInfoBuilder ); consumerOrderInfoManager.update( + null, false, TOPIC, GROUP, @@ -244,6 +255,7 @@ public void testConsumedCountForMultiQueue() { // reconsume two message StringBuilder orderInfoBuilder = new StringBuilder(); consumerOrderInfoManager.update( + null, false, TOPIC, GROUP, @@ -254,6 +266,7 @@ public void testConsumedCountForMultiQueue() { orderInfoBuilder ); consumerOrderInfoManager.update( + null, false, TOPIC, GROUP, @@ -275,6 +288,7 @@ public void testConsumedCountForMultiQueue() { // reconsume with a new message StringBuilder orderInfoBuilder = new StringBuilder(); consumerOrderInfoManager.update( + null, false, TOPIC, GROUP, @@ -285,6 +299,7 @@ public void testConsumedCountForMultiQueue() { orderInfoBuilder ); consumerOrderInfoManager.update( + null, false, TOPIC, GROUP, @@ -311,6 +326,7 @@ public void testUpdateNextVisibleTime() { StringBuilder orderInfoBuilder = new StringBuilder(); consumerOrderInfoManager.update( + null, false, TOPIC, GROUP, @@ -329,10 +345,11 @@ public void testUpdateNextVisibleTime() { assertEquals(2, consumerOrderInfoManager.commitAndNext(TOPIC, GROUP, QUEUE_ID_0, 3L, popTime)); assertEncodeAndDecode(); - await().atMost(Duration.ofSeconds(invisibleTime + 1)).until(() -> !consumerOrderInfoManager.checkBlock(TOPIC, GROUP, QUEUE_ID_0, invisibleTime)); + await().atMost(Duration.ofSeconds(invisibleTime + 1)).until(() -> !consumerOrderInfoManager.checkBlock(null, TOPIC, GROUP, QUEUE_ID_0, invisibleTime)); orderInfoBuilder = new StringBuilder(); consumerOrderInfoManager.update( + null, false, TOPIC, GROUP, @@ -350,11 +367,11 @@ public void testUpdateNextVisibleTime() { assertEncodeAndDecode(); assertEquals(2, consumerOrderInfoManager.commitAndNext(TOPIC, GROUP, QUEUE_ID_0, 4L, popTime)); assertEncodeAndDecode(); - assertTrue(consumerOrderInfoManager.checkBlock(TOPIC, GROUP, QUEUE_ID_0, invisibleTime)); + assertTrue(consumerOrderInfoManager.checkBlock(null, TOPIC, GROUP, QUEUE_ID_0, invisibleTime)); assertEquals(5L, consumerOrderInfoManager.commitAndNext(TOPIC, GROUP, QUEUE_ID_0, 2L, popTime)); assertEncodeAndDecode(); - assertFalse(consumerOrderInfoManager.checkBlock(TOPIC, GROUP, QUEUE_ID_0, invisibleTime)); + assertFalse(consumerOrderInfoManager.checkBlock(null, TOPIC, GROUP, QUEUE_ID_0, invisibleTime)); } @Test @@ -377,7 +394,7 @@ public void testAutoCleanAndEncode() { ConsumerOrderInfoManager consumerOrderInfoManager = new ConsumerOrderInfoManager(brokerController); { - consumerOrderInfoManager.update(false, + consumerOrderInfoManager.update(null, false, "errTopic", "errGroup", QUEUE_ID_0, @@ -390,7 +407,7 @@ public void testAutoCleanAndEncode() { assertEquals(0, consumerOrderInfoManager.getTable().size()); } { - consumerOrderInfoManager.update(false, + consumerOrderInfoManager.update(null, false, TOPIC, "errGroup", QUEUE_ID_0, @@ -404,7 +421,7 @@ public void testAutoCleanAndEncode() { } { topicConfig.setReadQueueNums(0); - consumerOrderInfoManager.update(false, + consumerOrderInfoManager.update(null, false, TOPIC, GROUP, QUEUE_ID_0, @@ -420,7 +437,7 @@ public void testAutoCleanAndEncode() { } { topicConfig.setReadQueueNums(8); - consumerOrderInfoManager.update(false, + consumerOrderInfoManager.update(null, false, TOPIC, GROUP, QUEUE_ID_0, @@ -461,7 +478,7 @@ private void assertEncodeAndDecode() { @Test public void testLoadFromOldVersionOrderInfoData() { - consumerOrderInfoManager.update(false, + consumerOrderInfoManager.update(null, false, TOPIC, GROUP, QUEUE_ID_0, @@ -479,10 +496,10 @@ public void testLoadFromOldVersionOrderInfoData() { String dataEncoded = consumerOrderInfoManager.encode(); consumerOrderInfoManager.decode(dataEncoded); - assertTrue(consumerOrderInfoManager.checkBlock(TOPIC, GROUP, QUEUE_ID_0, 3000)); + assertTrue(consumerOrderInfoManager.checkBlock(null, TOPIC, GROUP, QUEUE_ID_0, 3000)); StringBuilder orderInfoBuilder = new StringBuilder(); - consumerOrderInfoManager.update(false, + consumerOrderInfoManager.update(null, false, TOPIC, GROUP, QUEUE_ID_0, @@ -497,4 +514,24 @@ public void testLoadFromOldVersionOrderInfoData() { assertEquals(1, orderInfoMap.get(ExtraInfoUtil.getQueueOffsetMapKey(TOPIC, QUEUE_ID_0, 3)).intValue()); assertEquals(1, orderInfoMap.get(ExtraInfoUtil.getQueueOffsetMapKey(TOPIC, QUEUE_ID_0, 4)).intValue()); } + + @Test + public void testReentrant() { + StringBuilder orderInfoBuilder = new StringBuilder(); + String attemptId = UUID.randomUUID().toString(); + consumerOrderInfoManager.update( + attemptId, + false, + TOPIC, + GROUP, + QUEUE_ID_0, + popTime, + 3000, + Lists.newArrayList(1L, 2L, 3L), + orderInfoBuilder + ); + + assertTrue(consumerOrderInfoManager.checkBlock(null, TOPIC, GROUP, QUEUE_ID_0, 3000)); + assertFalse(consumerOrderInfoManager.checkBlock(attemptId, TOPIC, GROUP, QUEUE_ID_0, 3000)); + } } diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/PopMessageRequestHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/PopMessageRequestHeader.java index 2460a4f2e38..34b97987ddd 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/PopMessageRequestHeader.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/PopMessageRequestHeader.java @@ -50,6 +50,8 @@ public class PopMessageRequestHeader extends TopicQueueRequestHeader { */ private Boolean order = Boolean.FALSE; + private String attemptId; + @Override public void checkFields() throws RemotingCommandException { } @@ -154,6 +156,14 @@ public boolean isOrder() { return this.order != null && this.order.booleanValue(); } + public String getAttemptId() { + return attemptId; + } + + public void setAttemptId(String attemptId) { + this.attemptId = attemptId; + } + @Override public String toString() { return MoreObjects.toStringHelper(this) @@ -168,6 +178,7 @@ public String toString() { .add("expType", expType) .add("exp", exp) .add("order", order) + .add("attemptId", attemptId) .toString(); } } diff --git a/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQPopClient.java b/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQPopClient.java index b0c8c325061..85dfa7b4945 100644 --- a/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQPopClient.java +++ b/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQPopClient.java @@ -69,6 +69,12 @@ public void shutdown() { public CompletableFuture popMessageAsync(String brokerAddr, MessageQueue mq, long invisibleTime, int maxNums, String consumerGroup, long timeout, boolean poll, int initMode, boolean order, String expressionType, String expression) { + return popMessageAsync(brokerAddr, mq, invisibleTime, maxNums, consumerGroup, timeout, poll, initMode, order, expressionType, expression, null); + } + + public CompletableFuture popMessageAsync(String brokerAddr, MessageQueue mq, long invisibleTime, + int maxNums, String consumerGroup, long timeout, boolean poll, int initMode, boolean order, + String expressionType, String expression, String attemptId) { PopMessageRequestHeader requestHeader = new PopMessageRequestHeader(); requestHeader.setConsumerGroup(consumerGroup); requestHeader.setTopic(mq.getTopic()); @@ -79,6 +85,7 @@ public CompletableFuture popMessageAsync(String brokerAddr, MessageQu requestHeader.setExpType(expressionType); requestHeader.setExp(expression); requestHeader.setOrder(order); + requestHeader.setAttemptId(attemptId); if (poll) { requestHeader.setPollTime(timeout); requestHeader.setBornTime(System.currentTimeMillis()); diff --git a/test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/BasePopOrderly.java b/test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/BasePopOrderly.java index ecd70c1343e..acf70f7f94a 100644 --- a/test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/BasePopOrderly.java +++ b/test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/BasePopOrderly.java @@ -95,6 +95,19 @@ protected void assertConsumeTimes(List msgRcvList) { } } + protected void assertMsgRecv(int seqId, int expectNum, List expectReconsumeTimes) { + String msgId = msgRecvSequence.get(seqId); + List msgRcvList = msgRecv.get(msgId); + assertEquals(expectNum, msgRcvList.size()); + assertConsumeTimes(msgRcvList, expectReconsumeTimes); + } + + protected void assertConsumeTimes(List msgRcvList, List expectReconsumeTimes) { + for (int i = 0; i < msgRcvList.size(); i++) { + assertEquals(expectReconsumeTimes.get(i).intValue(), msgRcvList.get(i).messageExt.getReconsumeTimes()); + } + } + protected void onRecvNewMessage(MessageExt messageExt) { msgDataRecv.add(new String(messageExt.getBody())); msgRecvSequence.add(messageExt.getMsgId()); @@ -108,9 +121,13 @@ protected void onRecvNewMessage(MessageExt messageExt) { } protected CompletableFuture popMessageOrderlyAsync(long invisibleTime, int maxNums, long timeout) { + return popMessageOrderlyAsync(invisibleTime, maxNums, timeout, null); + } + + protected CompletableFuture popMessageOrderlyAsync(long invisibleTime, int maxNums, long timeout, String attemptId) { return client.popMessageAsync( brokerAddr, messageQueue, invisibleTime, maxNums, group, timeout, true, - ConsumeInitMode.MIN, true, ExpressionType.TAG, "*"); + ConsumeInitMode.MIN, true, ExpressionType.TAG, "*", attemptId); } protected CompletableFuture ackMessageAsync(MessageExt messageExt) { diff --git a/test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/PopOrderlyIT.java b/test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/PopOrderlyIT.java index 04c7f4a349b..efb12a321d5 100644 --- a/test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/PopOrderlyIT.java +++ b/test/src/test/java/org/apache/rocketmq/test/client/consumer/pop/PopOrderlyIT.java @@ -24,6 +24,7 @@ import java.util.concurrent.TimeUnit; import org.apache.rocketmq.client.consumer.PopResult; import org.apache.rocketmq.common.message.MessageExt; +import org.assertj.core.util.Lists; import org.junit.Test; import static org.awaitility.Awaitility.await; @@ -248,4 +249,41 @@ private CompletableFuture popMessageOrderlyThenChangeInvisibleTimeMidMessa }); return resultFuture; } + + @Test + public void testReentrant() { + producer.send(1); + + popMessageForReentrant(null).join(); + assertMsgRecv(0, 1, Lists.newArrayList(0)); + + String attemptId01 = "attemptId-01"; + popMessageForReentrant(attemptId01).join(); + assertMsgRecv(0, 2, Lists.newArrayList(0, 1)); + popMessageForReentrant(attemptId01).join(); + assertMsgRecv(0, 3, Lists.newArrayList(0, 1, 1)); + + String attemptId02 = "attemptId-02"; + await().atLeast(Duration.ofSeconds(5)).atMost(Duration.ofSeconds(15)).until(() -> { + popMessageForReentrant(attemptId02).join(); + return msgRecvSequence.size() == 4; + }); + popMessageForReentrant(attemptId02).join(); + assertMsgRecv(0, 5, Lists.newArrayList(0, 1, 1, 2, 2)); + + await().atLeast(Duration.ofSeconds(5)).atMost(Duration.ofSeconds(15)).until(() -> { + popMessageForReentrant(null).join(); + return msgRecvSequence.size() == 6; + }); + assertMsgRecv(0, 6, Lists.newArrayList(0, 1, 1, 2, 2, 3)); + } + + private CompletableFuture popMessageForReentrant(String attemptId) { + return popMessageOrderlyAsync(TimeUnit.SECONDS.toMillis(10), 3, TimeUnit.SECONDS.toMillis(30), attemptId) + .thenAccept(popResult -> { + for (MessageExt messageExt : popResult.getMsgFoundList()) { + onRecvNewMessage(messageExt); + } + }); + } } diff --git a/test/src/test/java/org/apache/rocketmq/test/offset/OffsetResetForPopIT.java b/test/src/test/java/org/apache/rocketmq/test/offset/OffsetResetForPopIT.java index cedc0fe2aa4..b9798cfd5a4 100644 --- a/test/src/test/java/org/apache/rocketmq/test/offset/OffsetResetForPopIT.java +++ b/test/src/test/java/org/apache/rocketmq/test/offset/OffsetResetForPopIT.java @@ -155,12 +155,12 @@ public void testResetOffsetThenAckOldForPopOrderly() throws Exception { // ack old msg, expect has no effect ackMessageSync(popResult1.getMsgFoundList()); Assert.assertTrue(brokerController1.getConsumerOrderInfoManager() - .checkBlock(topic, group, 0, RMQPopConsumer.DEFAULT_INVISIBLE_TIME)); + .checkBlock(null, topic, group, 0, RMQPopConsumer.DEFAULT_INVISIBLE_TIME)); // ack new msg ackMessageSync(popResult2.getMsgFoundList()); Assert.assertFalse(brokerController1.getConsumerOrderInfoManager() - .checkBlock(topic, group, 0, RMQPopConsumer.DEFAULT_INVISIBLE_TIME)); + .checkBlock(null, topic, group, 0, RMQPopConsumer.DEFAULT_INVISIBLE_TIME)); } @Test @@ -176,12 +176,12 @@ public void testRestOffsetToSkipMsgForPopOrderly() throws Exception { PopResult popResult = consumer.popOrderly(brokerController1.getBrokerAddr(), mq); Assert.assertEquals(messageCount - resetOffset, popResult.getMsgFoundList().size()); Assert.assertTrue(brokerController1.getConsumerOrderInfoManager() - .checkBlock(topic, group, 0, RMQPopConsumer.DEFAULT_INVISIBLE_TIME)); + .checkBlock(null, topic, group, 0, RMQPopConsumer.DEFAULT_INVISIBLE_TIME)); ackMessageSync(popResult.getMsgFoundList()); TimeUnit.SECONDS.sleep(1); Assert.assertFalse(brokerController1.getConsumerOrderInfoManager() - .checkBlock(topic, group, 0, RMQPopConsumer.DEFAULT_INVISIBLE_TIME)); + .checkBlock(null, topic, group, 0, RMQPopConsumer.DEFAULT_INVISIBLE_TIME)); } @Test