diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/PositionAckSetUtil.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/PositionAckSetUtil.java index 336c3a69e45c1..8173b30c4fea9 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/PositionAckSetUtil.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/PositionAckSetUtil.java @@ -45,12 +45,18 @@ public static void andAckSet(PositionImpl currentPosition, PositionImpl otherPos if (currentPosition == null || otherPosition == null) { return; } - BitSetRecyclable thisAckSet = BitSetRecyclable.valueOf(currentPosition.getAckSet()); - BitSetRecyclable otherAckSet = BitSetRecyclable.valueOf(otherPosition.getAckSet()); + currentPosition.setAckSet(andAckSet(currentPosition.getAckSet(), otherPosition.getAckSet())); + } + + //This method is do `and` operation for ack set + public static long[] andAckSet(long[] firstAckSet, long[] secondAckSet) { + BitSetRecyclable thisAckSet = BitSetRecyclable.valueOf(firstAckSet); + BitSetRecyclable otherAckSet = BitSetRecyclable.valueOf(secondAckSet); thisAckSet.and(otherAckSet); - currentPosition.setAckSet(thisAckSet.toLongArray()); + long[] ackSet = thisAckSet.toLongArray(); thisAckSet.recycle(); otherAckSet.recycle(); + return ackSet; } //This method is compare two position which position is bigger than another one. diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java index a1437efb8a4d3..ef2fd80302a98 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.broker.service; +import static org.apache.bookkeeper.mledger.util.PositionAckSetUtil.andAckSet; import io.netty.buffer.ByteBuf; import io.prometheus.client.Gauge; import java.util.ArrayList; @@ -37,8 +38,10 @@ import org.apache.pulsar.broker.intercept.BrokerInterceptor; import org.apache.pulsar.broker.service.persistent.CompactorSubscription; import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter; +import org.apache.pulsar.broker.service.persistent.PersistentSubscription; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.broker.service.plugin.EntryFilter; +import org.apache.pulsar.broker.transaction.pendingack.impl.PendingAckHandleImpl; import org.apache.pulsar.client.api.transaction.TxnID; import org.apache.pulsar.common.api.proto.CommandAck.AckType; import org.apache.pulsar.common.api.proto.MessageMetadata; @@ -217,8 +220,28 @@ public int filterEntriesForConsumer(@Nullable MessageMetadata[] metadataArray, i batchSizes.setBatchSize(i, batchSize); long[] ackSet = null; if (indexesAcks != null && cursor != null) { + PositionImpl position = PositionImpl.get(entry.getLedgerId(), entry.getEntryId()); ackSet = cursor - .getDeletedBatchIndexesAsLongArray(PositionImpl.get(entry.getLedgerId(), entry.getEntryId())); + .getDeletedBatchIndexesAsLongArray(position); + // some batch messages ack bit sit will be in pendingAck state, so don't send all bit sit to consumer + if (subscription instanceof PersistentSubscription + && ((PersistentSubscription) subscription) + .getPendingAckHandle() instanceof PendingAckHandleImpl) { + PositionImpl positionInPendingAck = + ((PersistentSubscription) subscription).getPositionInPendingAck(position); + // if this position not in pendingAck state, don't need to do any op + if (positionInPendingAck != null) { + if (positionInPendingAck.hasAckSet()) { + // need to or ackSet in pendingAck state and cursor ackSet which bit sit has been acked + if (ackSet != null) { + ackSet = andAckSet(ackSet, positionInPendingAck.getAckSet()); + } else { + // if actSet is null, use pendingAck ackSet + ackSet = positionInPendingAck.getAckSet(); + } + } + } + } if (ackSet != null) { indexesAcks.setIndexesAcks(i, Pair.of(batchSize, ackSet)); } else { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java index 2012aa06b3006..a166f1789257c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -1227,6 +1227,9 @@ public Map getSubscriptionProperties() { return subscriptionProperties; } + public PositionImpl getPositionInPendingAck(PositionImpl position) { + return pendingAckHandle.getPositionInPendingAck(position); + } @Override public CompletableFuture updateSubscriptionProperties(Map subscriptionProperties) { Map newSubscriptionProperties; @@ -1240,7 +1243,6 @@ public CompletableFuture updateSubscriptionProperties(Map this.subscriptionProperties = newSubscriptionProperties; }); } - /** * Return a merged map that contains the cursor properties specified by used * (eg. when using compaction subscription) and the subscription properties. diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckHandle.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckHandle.java index c06e4ebcc1229..a7892a56f0bd5 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckHandle.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckHandle.java @@ -159,6 +159,17 @@ CompletableFuture individualAcknowledgeMessage(TxnID txnID, List + * If it does not return null, it means this Position is in pendingAck and if it is batch Position, + * it will return the corresponding ackSet in pendingAck + * + * @param position {@link Position} witch need to get in pendingAck + * @return {@link Position} return the position in pendingAck + */ + PositionImpl getPositionInPendingAck(PositionImpl position); + /** * Get the stats of this message position is in pending ack. * @param position message position. diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleDisabled.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleDisabled.java index 3c428106e3eb3..0fc528f880070 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleDisabled.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleDisabled.java @@ -103,6 +103,9 @@ public boolean checkIfPendingAckStoreInit() { } @Override + public PositionImpl getPositionInPendingAck(PositionImpl position) { + return null; + } public PositionInPendingAckStats checkPositionInPendingAckState(PositionImpl position, Integer batchIndex) { return null; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java index ed78feb453d1d..7dbe0385fd7e9 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java @@ -1061,6 +1061,17 @@ public boolean checkIfPendingAckStoreInit() { return this.pendingAckStoreFuture != null && this.pendingAckStoreFuture.isDone(); } + @Override + public PositionImpl getPositionInPendingAck(PositionImpl position) { + if (individualAckPositions != null) { + MutablePair positionPair = this.individualAckPositions.get(position); + if (positionPair != null) { + return positionPair.getLeft(); + } + } + return null; + } + protected void handleCacheRequest() { while (true) { Runnable runnable = acceptQueue.poll(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java index 696a0a7957c47..527b8532e0452 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java @@ -120,6 +120,62 @@ public Object[][] enableBatch() { return new Object[][] { { Boolean.TRUE }, { Boolean.FALSE } }; } + @Test + private void testIndividualAckAbortFilterAckSetInPendingAckState() throws Exception { + final String topicName = NAMESPACE1 + "/testIndividualAckAbortFilterAckSetInPendingAckState"; + final int count = 9; + Producer producer = pulsarClient + .newProducer(Schema.INT32) + .topic(topicName) + .enableBatching(true) + .batchingMaxPublishDelay(1, TimeUnit.HOURS) + .batchingMaxMessages(count).create(); + + @Cleanup + Consumer consumer = pulsarClient + .newConsumer(Schema.INT32) + .topic(topicName) + .isAckReceiptEnabled(true) + .subscriptionName("test") + .subscriptionType(SubscriptionType.Shared) + .enableBatchIndexAcknowledgment(true) + .subscribe(); + + for (int i = 0; i < count; i++) { + producer.sendAsync(i); + } + + Transaction firstTransaction = getTxn(); + + Transaction secondTransaction = getTxn(); + + // firstTransaction ack the first three messages and don't end the firstTransaction + for (int i = 0; i < count / 3; i++) { + consumer.acknowledgeAsync(consumer.receive().getMessageId(), firstTransaction).get(); + } + + // if secondTransaction abort we only can receive the middle three messages + for (int i = 0; i < count / 3; i++) { + consumer.acknowledgeAsync(consumer.receive().getMessageId(), secondTransaction).get(); + } + + // consumer normal ack the last three messages + for (int i = 0; i < count / 3; i++) { + consumer.acknowledgeAsync(consumer.receive()).get(); + } + + // if secondTransaction abort we only can receive the middle three messages + secondTransaction.abort().get(); + + // can receive 3 4 5 bit sit message + for (int i = 0; i < count / 3; i++) { + assertEquals(consumer.receive().getValue().intValue(), i + 3); + } + + // can't receive message anymore + assertNull(consumer.receive(2, TimeUnit.SECONDS)); + } + @Test(dataProvider="enableBatch") private void produceCommitTest(boolean enableBatch) throws Exception { @Cleanup @@ -674,7 +730,7 @@ private void txnCumulativeAckTest(boolean batchEnable, int maxBatchSize, Subscri admin.topics().delete(normalTopic, true); } - private Transaction getTxn() throws Exception { + public Transaction getTxn() throws Exception { return pulsarClient .newTransaction() .withTransactionTimeout(10, TimeUnit.SECONDS)