Skip to content

Commit

Permalink
[ISSUE #8592] Not notify long polling request when pop orderly consum…
Browse files Browse the repository at this point in the history
…e blocked (#8593)
  • Loading branch information
lizhimins committed Aug 29, 2024
1 parent 00a05a5 commit 18c30cb
Showing 1 changed file with 15 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -551,18 +551,24 @@ private CompletableFuture<Long> popMsgFromQueue(String topic, String attemptId,
future.whenComplete((result, throwable) -> queueLockManager.unLock(lockKey));
offset = getPopOffset(topic, requestHeader.getConsumerGroup(), queueId, requestHeader.getInitMode(),
true, lockKey, true);
if (isOrder && brokerController.getConsumerOrderInfoManager().checkBlock(attemptId, topic,
requestHeader.getConsumerGroup(), queueId, requestHeader.getInvisibleTime())) {
future.complete(this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId) - offset + restNum);
return future;
}

// Current requests would calculate the total number of messages
// waiting to be filtered for new message arrival notifications in
// the long-polling service, need disregarding the backlog in order
// consumption scenario. If rest message num including the blocked
// queue accumulation would lead to frequent unnecessary wake-ups
// of long-polling requests, resulting unnecessary CPU usage.
// When client ack message, long-polling request would be notifications
// by AckMessageProcessor.ackOrderly() and message will not be delayed.
if (isOrder) {
if (brokerController.getConsumerOrderInfoManager().checkBlock(
attemptId, topic, requestHeader.getConsumerGroup(), queueId, requestHeader.getInvisibleTime())) {
// should not add accumulation(max offset - consumer offset) here
future.complete(restNum);
return future;
}
this.brokerController.getPopInflightMessageCounter().clearInFlightMessageNum(
topic,
requestHeader.getConsumerGroup(),
queueId
);
topic, requestHeader.getConsumerGroup(), queueId);
}

if (getMessageResult.getMessageMapedList().size() >= requestHeader.getMaxMsgNums()) {
Expand Down

0 comments on commit 18c30cb

Please sign in to comment.