Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix] [broker] Enabling batch causes negative unackedMessages due to ack and delivery concurrency #22090

Merged
merged 3 commits into from
Feb 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ public Future<Void> sendMessages(final List<? extends Entry> entries, EntryBatch
if (pendingAcks != null) {
int batchSize = batchSizes.getBatchSize(i);
int stickyKeyHash = getStickyKeyHash(entry);
long[] ackSet = getCursorAckSet(PositionImpl.get(entry.getLedgerId(), entry.getEntryId()));
long[] ackSet = batchIndexesAcks == null ? null : batchIndexesAcks.getAckSet(i);
if (ackSet != null) {
unackedMessages -= (batchSize - BitSet.valueOf(ackSet).cardinality());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,17 @@
*/
package org.apache.pulsar.broker.service;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyList;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.spy;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import com.carrotsearch.hppc.ObjectSet;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
Expand All @@ -28,19 +37,25 @@
import lombok.Cleanup;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageIdAdv;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.BitSetRecyclable;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.BeforeClass;
Expand Down Expand Up @@ -401,4 +416,171 @@ public void testMixIndexAndNonIndexUnAckMessageCount() throws Exception {
assertEquals(admin.topics().getStats(topicName).getSubscriptions()
.get("sub").getUnackedMessages(), 0);
}

@Test
public void testUnAckMessagesWhenConcurrentDeliveryAndAck() throws Exception {
final String topicName = BrokerTestUtil.newUniqueName("persistent://prop/ns-abc/tp");
final String subName = "s1";
final int receiverQueueSize = 500;
admin.topics().createNonPartitionedTopic(topicName);
admin.topics().createSubscription(topicName, subName, MessageId.earliest);
ConsumerBuilder<String> consumerBuilder = pulsarClient.newConsumer(Schema.STRING)
.topic(topicName)
.receiverQueueSize(receiverQueueSize)
.subscriptionName(subName)
.enableBatchIndexAcknowledgment(true)
.subscriptionType(SubscriptionType.Shared)
.isAckReceiptEnabled(true);

// Send 100 messages.
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.topic(topicName)
.enableBatching(true)
.batchingMaxPublishDelay(1, TimeUnit.HOURS)
.create();
CompletableFuture<MessageId> lastSent = null;
for (int i = 0; i < 100; i++) {
lastSent = producer.sendAsync(i + "");
}
producer.flush();
lastSent.join();

// When consumer1 is closed, may some messages are in the client memory(it they are being acked now).
Consumer<String> consumer1 = consumerBuilder.consumerName("c1").subscribe();
Message[] messagesInClientMemory = new Message[2];
for (int i = 0; i < 2; i++) {
Message msg = consumer1.receive(2, TimeUnit.SECONDS);
assertNotNull(msg);
messagesInClientMemory[i] = msg;
}
ConsumerImpl<String> consumer2 = (ConsumerImpl<String>) consumerBuilder.consumerName("c2").subscribe();
Awaitility.await().until(() -> consumer2.isConnected());

// The consumer2 will receive messages after consumer1 closed.
// Insert a delay mechanism to make the flow like below:
// 1. Close consumer1, then the 100 messages will be redelivered.
// 2. Read redeliver messages. No messages were acked at this time.
// 3. The in-flight ack of two messages is finished.
// 4. Send the messages to consumer2, consumer2 will get all the 100 messages.
CompletableFuture<Void> receiveMessageSignal2 = new CompletableFuture<>();
org.apache.pulsar.broker.service.Consumer serviceConsumer2 =
makeConsumerReceiveMessagesDelay(topicName, subName, "c2", receiveMessageSignal2);
// step 1: close consumer.
consumer1.close();
// step 2: wait for read messages from replay queue.
Thread.sleep(2 * 1000);
// step 3: wait for the in-flight ack.
BitSetRecyclable bitSetRecyclable = createBitSetRecyclable(100);
long ledgerId = 0, entryId = 0;
for (Message message : messagesInClientMemory) {
BatchMessageIdImpl msgId = (BatchMessageIdImpl) message.getMessageId();
bitSetRecyclable.clear(msgId.getBatchIndex());
ledgerId = msgId.getLedgerId();
entryId = msgId.getEntryId();
}
getCursor(topicName, subName).delete(PositionImpl.get(ledgerId, entryId, bitSetRecyclable.toLongArray()));
// step 4: send messages to consumer2.
receiveMessageSignal2.complete(null);
// Verify: Consumer2 will get all the 100 messages, and "unAckMessages" is 100.
List<Message> messages2 = new ArrayList<>();
while (true) {
Message msg = consumer2.receive(2, TimeUnit.SECONDS);
if (msg == null) {
break;
}
messages2.add(msg);
}
assertEquals(messages2.size(), 100);
assertEquals(serviceConsumer2.getUnackedMessages(), 100);
// After the messages were pop out, the permits in the client memory went to 100.
Awaitility.await().untilAsserted(() -> {
assertEquals(serviceConsumer2.getAvailablePermits() + consumer2.getAvailablePermits(),
receiverQueueSize);
});

// cleanup.
producer.close();
consumer2.close();
admin.topics().delete(topicName, false);
}

private BitSetRecyclable createBitSetRecyclable(int batchSize) {
BitSetRecyclable bitSetRecyclable = new BitSetRecyclable(batchSize);
for (int i = 0; i < batchSize; i++) {
bitSetRecyclable.set(i);
}
return bitSetRecyclable;
}

private ManagedCursorImpl getCursor(String topic, String sub) {
PersistentTopic persistentTopic =
(PersistentTopic) pulsar.getBrokerService().getTopic(topic, false).join().get();
PersistentDispatcherMultipleConsumers dispatcher =
(PersistentDispatcherMultipleConsumers) persistentTopic.getSubscription(sub).getDispatcher();
return (ManagedCursorImpl) dispatcher.getCursor();
}

/***
* After {@param signal} complete, the consumer({@param consumerName}) start to receive messages.
*/
private org.apache.pulsar.broker.service.Consumer makeConsumerReceiveMessagesDelay(String topic, String sub,
String consumerName,
CompletableFuture<Void> signal) throws Exception {
PersistentTopic persistentTopic =
(PersistentTopic) pulsar.getBrokerService().getTopic(topic, false).join().get();
PersistentDispatcherMultipleConsumers dispatcher =
(PersistentDispatcherMultipleConsumers) persistentTopic.getSubscription(sub).getDispatcher();
org.apache.pulsar.broker.service.Consumer serviceConsumer = null;
for (org.apache.pulsar.broker.service.Consumer c : dispatcher.getConsumers()){
if (c.consumerName().equals(consumerName)) {
serviceConsumer = c;
break;
}
}
final org.apache.pulsar.broker.service.Consumer originalConsumer = serviceConsumer;

// Insert a delay signal.
org.apache.pulsar.broker.service.Consumer spyServiceConsumer = spy(originalConsumer);
doAnswer(invocation -> {
List<? extends Entry> entries = (List<? extends Entry>) invocation.getArguments()[0];
EntryBatchSizes batchSizes = (EntryBatchSizes) invocation.getArguments()[1];
EntryBatchIndexesAcks batchIndexesAcks = (EntryBatchIndexesAcks) invocation.getArguments()[2];
int totalMessages = (int) invocation.getArguments()[3];
long totalBytes = (long) invocation.getArguments()[4];
long totalChunkedMessages = (long) invocation.getArguments()[5];
RedeliveryTracker redeliveryTracker = (RedeliveryTracker) invocation.getArguments()[6];
return signal.thenApply(__ -> originalConsumer.sendMessages(entries, batchSizes, batchIndexesAcks, totalMessages, totalBytes,
totalChunkedMessages, redeliveryTracker)).join();
}).when(spyServiceConsumer)
.sendMessages(anyList(), any(), any(), anyInt(), anyLong(), anyLong(), any());
doAnswer(invocation -> {
List<? extends Entry> entries = (List<? extends Entry>) invocation.getArguments()[0];
EntryBatchSizes batchSizes = (EntryBatchSizes) invocation.getArguments()[1];
EntryBatchIndexesAcks batchIndexesAcks = (EntryBatchIndexesAcks) invocation.getArguments()[2];
int totalMessages = (int) invocation.getArguments()[3];
long totalBytes = (long) invocation.getArguments()[4];
long totalChunkedMessages = (long) invocation.getArguments()[5];
RedeliveryTracker redeliveryTracker = (RedeliveryTracker) invocation.getArguments()[6];
long epoch = (long) invocation.getArguments()[7];
return signal.thenApply(__ -> originalConsumer.sendMessages(entries, batchSizes, batchIndexesAcks, totalMessages, totalBytes,
totalChunkedMessages, redeliveryTracker, epoch)).join();
}).when(spyServiceConsumer)
.sendMessages(anyList(), any(), any(), anyInt(), anyLong(), anyLong(), any(), anyLong());

// Replace the consumer.
Field fConsumerList = AbstractDispatcherMultipleConsumers.class.getDeclaredField("consumerList");
Field fConsumerSet = AbstractDispatcherMultipleConsumers.class.getDeclaredField("consumerSet");
fConsumerList.setAccessible(true);
fConsumerSet.setAccessible(true);
List<org.apache.pulsar.broker.service.Consumer> consumerList =
(List<org.apache.pulsar.broker.service.Consumer>) fConsumerList.get(dispatcher);
ObjectSet<org.apache.pulsar.broker.service.Consumer> consumerSet =
(ObjectSet<org.apache.pulsar.broker.service.Consumer>) fConsumerSet.get(dispatcher);

consumerList.remove(originalConsumer);
consumerSet.removeAll(originalConsumer);
consumerList.add(spyServiceConsumer);
consumerSet.add(spyServiceConsumer);
return originalConsumer;
}
}
Loading