diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java index 6073023722a..47ef8e4013b 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java @@ -551,18 +551,24 @@ private CompletableFuture popMsgFromQueue(String topic, String attemptId, future.whenComplete((result, throwable) -> queueLockManager.unLock(lockKey)); offset = getPopOffset(topic, requestHeader.getConsumerGroup(), queueId, requestHeader.getInitMode(), true, lockKey, true); - if (isOrder && brokerController.getConsumerOrderInfoManager().checkBlock(attemptId, topic, - requestHeader.getConsumerGroup(), queueId, requestHeader.getInvisibleTime())) { - future.complete(this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId) - offset + restNum); - return future; - } + // Current requests would calculate the total number of messages + // waiting to be filtered for new message arrival notifications in + // the long-polling service, need disregarding the backlog in order + // consumption scenario. If rest message num including the blocked + // queue accumulation would lead to frequent unnecessary wake-ups + // of long-polling requests, resulting unnecessary CPU usage. + // When client ack message, long-polling request would be notifications + // by AckMessageProcessor.ackOrderly() and message will not be delayed. if (isOrder) { + if (brokerController.getConsumerOrderInfoManager().checkBlock( + attemptId, topic, requestHeader.getConsumerGroup(), queueId, requestHeader.getInvisibleTime())) { + // should not add accumulation(max offset - consumer offset) here + future.complete(restNum); + return future; + } this.brokerController.getPopInflightMessageCounter().clearInFlightMessageNum( - topic, - requestHeader.getConsumerGroup(), - queueId - ); + topic, requestHeader.getConsumerGroup(), queueId); } if (getMessageResult.getMessageMapedList().size() >= requestHeader.getMaxMsgNums()) {