Skip to content

Commit

Permalink
[ISSUE apache#6163] Fix the issue of infinite retry of order message (a…
Browse files Browse the repository at this point in the history
  • Loading branch information
RongtongJin authored Feb 23, 2023
1 parent 527350d commit 7cfffe7
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -184,8 +184,16 @@ private boolean handleRetryAndDLQ(SendMessageRequestHeader requestHeader, Remoti
maxReconsumeTimes = requestHeader.getMaxReconsumeTimes();
}
int reconsumeTimes = requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes();
// Using '>' instead of '>=' to compatible with the case that reconsumeTimes here are increased by client.
if (reconsumeTimes > maxReconsumeTimes) {

boolean sendRetryMessageToDeadLetterQueueDirectly = false;
if (!brokerController.getRebalanceLockManager().isLockAllExpired(groupName)) {
LOGGER.info("Group has unexpired lock record, which show it is ordered message, send it to DLQ "
+ "right now group={}, topic={}, reconsumeTimes={}, maxReconsumeTimes={}.", groupName,
newTopic, reconsumeTimes, maxReconsumeTimes);
sendRetryMessageToDeadLetterQueueDirectly = true;
}

if (reconsumeTimes > maxReconsumeTimes || sendRetryMessageToDeadLetterQueueDirectly) {
Attributes attributes = BrokerMetricsManager.newAttributesBuilder()
.put(LABEL_CONSUMER_GROUP, requestHeader.getProducerGroup())
.put(LABEL_TOPIC, requestHeader.getTopic())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,7 @@ public boolean sendMessageBack(final MessageExt msg) {
MessageAccessor.setOriginMessageId(newMsg, UtilAll.isBlank(originMsgId) ? msg.getMsgId() : originMsgId);
newMsg.setFlag(msg.getFlag());
MessageAccessor.putProperty(newMsg, MessageConst.PROPERTY_RETRY_TOPIC, msg.getTopic());
MessageAccessor.setReconsumeTime(newMsg, String.valueOf(msg.getReconsumeTimes()));
MessageAccessor.setReconsumeTime(newMsg, String.valueOf(msg.getReconsumeTimes() + 1));
MessageAccessor.setMaxReconsumeTimes(newMsg, String.valueOf(getMaxReconsumeTimes()));
MessageAccessor.clearProperty(newMsg, MessageConst.PROPERTY_TRANSACTION_PREPARED);
newMsg.setDelayTimeLevel(3 + msg.getReconsumeTimes());
Expand Down

0 comments on commit 7cfffe7

Please sign in to comment.