Skip to content

Commit

Permalink
[fix][broker] Fix incorrect unack count when using shared subscriptio…
Browse files Browse the repository at this point in the history
…n on non-persistent topic (#21592)

Fixes #21568

Motivation
Fix incorrect unack count when using shared subscription on non-persistent topic

Modifications
In the case of a non-persistent topic, the consumer does not send an ack to the broker (see org.apache.pulsar.client.impl.NonPersistentAcknowledgmentGroupingTracker# addAcknowledgment)

To work around this, we can update unackedMessages when the broker sends a message to the consumer successfully.
  • Loading branch information
1Jack2 authored and Technoboy- committed Dec 4, 2023
1 parent 4aabd42 commit d458b80
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,9 @@ public class Consumer {
private final ConsumerStatsImpl stats;

private final boolean isDurable;

private final boolean isPersistentTopic;

private static final AtomicIntegerFieldUpdater<Consumer> UNACKED_MESSAGES_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(Consumer.class, "unackedMessages");
private volatile int unackedMessages = 0;
Expand Down Expand Up @@ -172,6 +175,7 @@ public Consumer(Subscription subscription, SubType subType, String topicName, lo
this.readCompacted = readCompacted;
this.consumerName = consumerName;
this.isDurable = isDurable;
this.isPersistentTopic = subscription.getTopic() instanceof PersistentTopic;
this.keySharedMeta = keySharedMeta;
this.cnx = cnx;
this.msgOut = new Rate();
Expand Down Expand Up @@ -239,6 +243,7 @@ public Consumer(Subscription subscription, SubType subType, String topicName, lo
this.pendingAcks = null;
this.stats = null;
this.isDurable = false;
this.isPersistentTopic = false;
this.metadata = null;
this.keySharedMeta = null;
this.clientAddress = null;
Expand Down Expand Up @@ -1073,7 +1078,7 @@ public Subscription getSubscription() {

private int addAndGetUnAckedMsgs(Consumer consumer, int ackedMessages) {
int unackedMsgs = 0;
if (Subscription.isIndividualAckMode(subType)) {
if (isPersistentTopic && Subscription.isIndividualAckMode(subType)) {
subscription.addUnAckedMessages(ackedMessages);
unackedMsgs = UNACKED_MESSAGES_UPDATER.addAndGet(consumer, ackedMessages);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -446,4 +446,37 @@ public void testAvgMessagesPerEntry() throws Exception {
int avgMessagesPerEntry = consumerStats.getAvgMessagesPerEntry();
assertEquals(3, avgMessagesPerEntry);
}

@Test()
public void testNonPersistentTopicSharedSubscriptionUnackedMessages() throws Exception {
final String topicName = "non-persistent://my-property/my-ns/my-topic" + UUID.randomUUID();
final String subName = "my-sub";

@Cleanup
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topicName)
.create();
@Cleanup
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic(topicName)
.subscriptionName(subName)
.subscriptionType(SubscriptionType.Shared)
.subscribe();

for (int i = 0; i < 5; i++) {
producer.send(("message-" + i).getBytes());
}
for (int i = 0; i < 5; i++) {
Message<byte[]> msg = consumer.receive(5, TimeUnit.SECONDS);
consumer.acknowledge(msg);
}
TimeUnit.SECONDS.sleep(1);

TopicStats topicStats = admin.topics().getStats(topicName);
assertEquals(1, topicStats.getSubscriptions().size());
List<? extends ConsumerStats> consumers = topicStats.getSubscriptions().get(subName).getConsumers();
assertEquals(1, consumers.size());
assertEquals(0, consumers.get(0).getUnackedMessages());
}

}

0 comments on commit d458b80

Please sign in to comment.