Skip to content

Commit

Permalink
[feature][txn] Fix individual ack batch message with transaction abor…
Browse files Browse the repository at this point in the history
…t redevlier duplicate messages (apache#14327)

### Motivation
If individual ack batch message with transaction and abort this transaction, we will redeliver this message. but this batch message some bit sit are acked by another transaction and re consume this bit sit will produce `TransactionConflictException`, we don't need to redeliver this bit sit witch is acked by another transaction.

if batch have batch size 5

1. txn1 ack 0, 1     the ackSet is   00111
2. txn2 ack 2 3 4 the ack Set is  11000
3. abort txn2 redeliver this position is 00111
4. but now we don't filter txn1 ackSet so redeliver this position bitSet is 111111
### Modifications
When filter the message we should filter the bit sit witch is real ack or in pendingAck state
### Verifying this change
add the test
  • Loading branch information
congbobo184 committed Feb 17, 2023
1 parent 66fda61 commit e0c0d5e
Show file tree
Hide file tree
Showing 7 changed files with 118 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,18 @@ public static void andAckSet(PositionImpl currentPosition, PositionImpl otherPos
if (currentPosition == null || otherPosition == null) {
return;
}
BitSetRecyclable thisAckSet = BitSetRecyclable.valueOf(currentPosition.getAckSet());
BitSetRecyclable otherAckSet = BitSetRecyclable.valueOf(otherPosition.getAckSet());
currentPosition.setAckSet(andAckSet(currentPosition.getAckSet(), otherPosition.getAckSet()));
}

//This method is do `and` operation for ack set
public static long[] andAckSet(long[] firstAckSet, long[] secondAckSet) {
BitSetRecyclable thisAckSet = BitSetRecyclable.valueOf(firstAckSet);
BitSetRecyclable otherAckSet = BitSetRecyclable.valueOf(secondAckSet);
thisAckSet.and(otherAckSet);
currentPosition.setAckSet(thisAckSet.toLongArray());
long[] ackSet = thisAckSet.toLongArray();
thisAckSet.recycle();
otherAckSet.recycle();
return ackSet;
}

//This method is compare two position which position is bigger than another one.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.service;

import static org.apache.bookkeeper.mledger.util.PositionAckSetUtil.andAckSet;
import io.netty.buffer.ByteBuf;
import io.prometheus.client.Gauge;
import java.util.ArrayList;
Expand All @@ -37,8 +38,10 @@
import org.apache.pulsar.broker.intercept.BrokerInterceptor;
import org.apache.pulsar.broker.service.persistent.CompactorSubscription;
import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.service.plugin.EntryFilter;
import org.apache.pulsar.broker.transaction.pendingack.impl.PendingAckHandleImpl;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.common.api.proto.CommandAck.AckType;
import org.apache.pulsar.common.api.proto.MessageMetadata;
Expand Down Expand Up @@ -217,8 +220,28 @@ public int filterEntriesForConsumer(@Nullable MessageMetadata[] metadataArray, i
batchSizes.setBatchSize(i, batchSize);
long[] ackSet = null;
if (indexesAcks != null && cursor != null) {
PositionImpl position = PositionImpl.get(entry.getLedgerId(), entry.getEntryId());
ackSet = cursor
.getDeletedBatchIndexesAsLongArray(PositionImpl.get(entry.getLedgerId(), entry.getEntryId()));
.getDeletedBatchIndexesAsLongArray(position);
// some batch messages ack bit sit will be in pendingAck state, so don't send all bit sit to consumer
if (subscription instanceof PersistentSubscription
&& ((PersistentSubscription) subscription)
.getPendingAckHandle() instanceof PendingAckHandleImpl) {
PositionImpl positionInPendingAck =
((PersistentSubscription) subscription).getPositionInPendingAck(position);
// if this position not in pendingAck state, don't need to do any op
if (positionInPendingAck != null) {
if (positionInPendingAck.hasAckSet()) {
// need to or ackSet in pendingAck state and cursor ackSet which bit sit has been acked
if (ackSet != null) {
ackSet = andAckSet(ackSet, positionInPendingAck.getAckSet());
} else {
// if actSet is null, use pendingAck ackSet
ackSet = positionInPendingAck.getAckSet();
}
}
}
}
if (ackSet != null) {
indexesAcks.setIndexesAcks(i, Pair.of(batchSize, ackSet));
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1227,6 +1227,9 @@ public Map<String, String> getSubscriptionProperties() {
return subscriptionProperties;
}

public PositionImpl getPositionInPendingAck(PositionImpl position) {
return pendingAckHandle.getPositionInPendingAck(position);
}
@Override
public CompletableFuture<Void> updateSubscriptionProperties(Map<String, String> subscriptionProperties) {
Map<String, String> newSubscriptionProperties;
Expand All @@ -1240,7 +1243,6 @@ public CompletableFuture<Void> updateSubscriptionProperties(Map<String, String>
this.subscriptionProperties = newSubscriptionProperties;
});
}

/**
* Return a merged map that contains the cursor properties specified by used
* (eg. when using compaction subscription) and the subscription properties.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,17 @@ CompletableFuture<Void> individualAcknowledgeMessage(TxnID txnID, List<MutablePa
*/
boolean checkIfPendingAckStoreInit();

/**
* If it returns null, it means this Position is not in pendingAck.
* <p>
* If it does not return null, it means this Position is in pendingAck and if it is batch Position,
* it will return the corresponding ackSet in pendingAck
*
* @param position {@link Position} witch need to get in pendingAck
* @return {@link Position} return the position in pendingAck
*/
PositionImpl getPositionInPendingAck(PositionImpl position);

/**
* Get the stats of this message position is in pending ack.
* @param position message position.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,9 @@ public boolean checkIfPendingAckStoreInit() {
}

@Override
public PositionImpl getPositionInPendingAck(PositionImpl position) {
return null;
}
public PositionInPendingAckStats checkPositionInPendingAckState(PositionImpl position, Integer batchIndex) {
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1061,6 +1061,17 @@ public boolean checkIfPendingAckStoreInit() {
return this.pendingAckStoreFuture != null && this.pendingAckStoreFuture.isDone();
}

@Override
public PositionImpl getPositionInPendingAck(PositionImpl position) {
if (individualAckPositions != null) {
MutablePair<PositionImpl, Integer> positionPair = this.individualAckPositions.get(position);
if (positionPair != null) {
return positionPair.getLeft();
}
}
return null;
}

protected void handleCacheRequest() {
while (true) {
Runnable runnable = acceptQueue.poll();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,62 @@ public Object[][] enableBatch() {
return new Object[][] { { Boolean.TRUE }, { Boolean.FALSE } };
}

@Test
private void testIndividualAckAbortFilterAckSetInPendingAckState() throws Exception {
final String topicName = NAMESPACE1 + "/testIndividualAckAbortFilterAckSetInPendingAckState";
final int count = 9;
Producer<Integer> producer = pulsarClient
.newProducer(Schema.INT32)
.topic(topicName)
.enableBatching(true)
.batchingMaxPublishDelay(1, TimeUnit.HOURS)
.batchingMaxMessages(count).create();

@Cleanup
Consumer<Integer> consumer = pulsarClient
.newConsumer(Schema.INT32)
.topic(topicName)
.isAckReceiptEnabled(true)
.subscriptionName("test")
.subscriptionType(SubscriptionType.Shared)
.enableBatchIndexAcknowledgment(true)
.subscribe();

for (int i = 0; i < count; i++) {
producer.sendAsync(i);
}

Transaction firstTransaction = getTxn();

Transaction secondTransaction = getTxn();

// firstTransaction ack the first three messages and don't end the firstTransaction
for (int i = 0; i < count / 3; i++) {
consumer.acknowledgeAsync(consumer.receive().getMessageId(), firstTransaction).get();
}

// if secondTransaction abort we only can receive the middle three messages
for (int i = 0; i < count / 3; i++) {
consumer.acknowledgeAsync(consumer.receive().getMessageId(), secondTransaction).get();
}

// consumer normal ack the last three messages
for (int i = 0; i < count / 3; i++) {
consumer.acknowledgeAsync(consumer.receive()).get();
}

// if secondTransaction abort we only can receive the middle three messages
secondTransaction.abort().get();

// can receive 3 4 5 bit sit message
for (int i = 0; i < count / 3; i++) {
assertEquals(consumer.receive().getValue().intValue(), i + 3);
}

// can't receive message anymore
assertNull(consumer.receive(2, TimeUnit.SECONDS));
}

@Test(dataProvider="enableBatch")
private void produceCommitTest(boolean enableBatch) throws Exception {
@Cleanup
Expand Down Expand Up @@ -674,7 +730,7 @@ private void txnCumulativeAckTest(boolean batchEnable, int maxBatchSize, Subscri
admin.topics().delete(normalTopic, true);
}

private Transaction getTxn() throws Exception {
public Transaction getTxn() throws Exception {
return pulsarClient
.newTransaction()
.withTransactionTimeout(10, TimeUnit.SECONDS)
Expand Down

0 comments on commit e0c0d5e

Please sign in to comment.