Skip to content

Commit

Permalink
[fix][client] fix negative message re-delivery twice issue (#20750)
Browse files Browse the repository at this point in the history
  • Loading branch information
aloyszhang authored Jul 17, 2023
1 parent 4c69584 commit ecd16d6
Show file tree
Hide file tree
Showing 6 changed files with 29 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,9 @@ public void testNegativeAcks(boolean batching, boolean usePartitions, Subscripti
consumer.negativeAcknowledge(msg);
}

assertTrue(consumer instanceof ConsumerBase<String>);
assertEquals(((ConsumerBase<String>) consumer).getUnAckedMessageTracker().size(), 0);

Set<String> receivedMessages = new HashSet<>();

// All the messages should be received again
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,10 @@ protected ConsumerBase(PulsarClientImpl client, String topic, ConsumerConfigurat
initReceiverQueueSize();
}

protected UnAckedMessageTracker getUnAckedMessageTracker() {
return unAckedMessageTracker;
}

protected void triggerBatchReceiveTimeoutTask() {
if (!hasBatchReceiveTimeout() && batchReceivePolicy.getTimeoutMs() > 0) {
batchReceiveTimeout = client.timer().newTimeout(this::pendingBatchReceiveTask,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,7 @@ public ConnectionHandler getConnectionHandler() {
return connectionHandler;
}

@Override
public UnAckedMessageTracker getUnAckedMessageTracker() {
return unAckedMessageTracker;
}
Expand Down Expand Up @@ -756,7 +757,7 @@ public void negativeAcknowledge(Message<?> message) {
negativeAcksTracker.add(message);

// Ensure the message is not redelivered for ack-timeout, since we did receive an "ack"
unAckedMessageTracker.remove(message.getMessageId());
unAckedMessageTracker.remove(MessageIdAdvUtils.discardBatch(message.getMessageId()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -546,6 +546,7 @@ public void negativeAcknowledge(MessageId messageId) {
checkArgument(messageId instanceof TopicMessageId);
ConsumerImpl<T> consumer = consumers.get(((TopicMessageId) messageId).getOwnerTopic());
consumer.negativeAcknowledge(messageId);
unAckedMessageTracker.remove(messageId);
}

@Override
Expand All @@ -554,6 +555,7 @@ public void negativeAcknowledge(Message<?> message) {
checkArgument(messageId instanceof TopicMessageId);
ConsumerImpl<T> consumer = consumers.get(((TopicMessageId) messageId).getOwnerTopic());
consumer.negativeAcknowledge(message);
unAckedMessageTracker.remove(messageId);
}

@Override
Expand Down Expand Up @@ -852,6 +854,7 @@ public synchronized ConsumerStats getStats() {
return stats;
}

@Override
public UnAckedMessageTracker getUnAckedMessageTracker() {
return unAckedMessageTracker;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,11 @@
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.RedeliveryBackoff;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class NegativeAcksTracker implements Closeable {
private static final Logger log = LoggerFactory.getLogger(NegativeAcksTracker.class);

private HashMap<MessageId, Long> nackedMessages = null;

Expand Down Expand Up @@ -79,9 +82,12 @@ private synchronized void triggerRedelivery(Timeout t) {
}
});

messagesToRedeliver.forEach(nackedMessages::remove);
consumer.onNegativeAcksSend(messagesToRedeliver);
consumer.redeliverUnacknowledgedMessages(messagesToRedeliver);
if (!messagesToRedeliver.isEmpty()) {
messagesToRedeliver.forEach(nackedMessages::remove);
consumer.onNegativeAcksSend(messagesToRedeliver);
log.info("[{}] {} messages will be re-delivered", consumer, messagesToRedeliver.size());
consumer.redeliverUnacknowledgedMessages(messagesToRedeliver);
}

this.timeout = timer.newTimeout(this::triggerRedelivery, timerIntervalNanos, TimeUnit.NANOSECONDS);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,10 @@ public void clear() {
}

public boolean add(MessageId messageId) {
if (messageId == null) {
return false;
}

writeLock.lock();
try {
HashSet<MessageId> partition = timePartitions.peekLast();
Expand Down Expand Up @@ -217,6 +221,10 @@ boolean isEmpty() {
}

public boolean remove(MessageId messageId) {
if (messageId == null) {
return false;
}

writeLock.lock();
try {
boolean removed = false;
Expand Down

0 comments on commit ecd16d6

Please sign in to comment.