Skip to content

Commit

Permalink
[fix][broker] Avoid compaction task stuck when the last message to co…
Browse files Browse the repository at this point in the history
…mpact is a marker (#21718)
  • Loading branch information
coderzc authored Dec 20, 2023
1 parent ed68ec1 commit fc393f6
Show file tree
Hide file tree
Showing 4 changed files with 152 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.pulsar.common.api.proto.ReplicatedSubscriptionsSnapshot;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.Markers;
import org.apache.pulsar.compaction.Compactor;
import org.checkerframework.checker.nullness.qual.Nullable;

@Slf4j
Expand Down Expand Up @@ -174,13 +175,15 @@ public int filterEntriesForConsumer(@Nullable MessageMetadata[] metadataArray, i
if (msgMetadata != null && msgMetadata.hasTxnidMostBits()
&& msgMetadata.hasTxnidLeastBits()) {
if (Markers.isTxnMarker(msgMetadata)) {
// because consumer can receive message is smaller than maxReadPosition,
// so this marker is useless for this subscription
individualAcknowledgeMessageIfNeeded(Collections.singletonList(entry.getPosition()),
Collections.emptyMap());
entries.set(i, null);
entry.release();
continue;
if (cursor == null || !cursor.getName().equals(Compactor.COMPACTION_SUBSCRIPTION)) {
// because consumer can receive message is smaller than maxReadPosition,
// so this marker is useless for this subscription
individualAcknowledgeMessageIfNeeded(Collections.singletonList(entry.getPosition()),
Collections.emptyMap());
entries.set(i, null);
entry.release();
continue;
}
} else if (((PersistentTopic) subscription.getTopic())
.isTxnAborted(new TxnID(msgMetadata.getTxnidMostBits(), msgMetadata.getTxnidLeastBits()),
(PositionImpl) entry.getPosition())) {
Expand All @@ -192,19 +195,26 @@ public int filterEntriesForConsumer(@Nullable MessageMetadata[] metadataArray, i
}
}

if (msgMetadata == null || Markers.isServerOnlyMarker(msgMetadata)) {
if (msgMetadata == null || (Markers.isServerOnlyMarker(msgMetadata))) {
PositionImpl pos = (PositionImpl) entry.getPosition();
// Message metadata was corrupted or the messages was a server-only marker

if (Markers.isReplicatedSubscriptionSnapshotMarker(msgMetadata)) {
final int readerIndex = metadataAndPayload.readerIndex();
processReplicatedSubscriptionSnapshot(pos, metadataAndPayload);
metadataAndPayload.readerIndex(readerIndex);
}

entries.set(i, null);
entry.release();
individualAcknowledgeMessageIfNeeded(Collections.singletonList(pos),
Collections.emptyMap());
continue;
// Deliver marker to __compaction cursor to avoid compaction task stuck,
// and filter out them when doing topic compaction.
if (msgMetadata == null || cursor == null
|| !cursor.getName().equals(Compactor.COMPACTION_SUBSCRIPTION)) {
entries.set(i, null);
entry.release();
individualAcknowledgeMessageIfNeeded(Collections.singletonList(pos),
Collections.emptyMap());
continue;
}
} else if (trackDelayedDelivery(entry.getLedgerId(), entry.getEntryId(), msgMetadata)) {
// The message is marked for delayed delivery. Ignore for now.
entries.set(i, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.pulsar.client.impl.RawBatchConverter;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.Markers;
import org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -130,7 +131,10 @@ private void phaseOneLoop(RawReader reader,
boolean replaceMessage = false;
mxBean.addCompactionReadOp(reader.getTopic(), m.getHeadersAndPayload().readableBytes());
MessageMetadata metadata = Commands.parseMessageMetadata(m.getHeadersAndPayload());
if (RawBatchConverter.isReadableBatch(metadata)) {
if (Markers.isServerOnlyMarker(metadata)) {
mxBean.addCompactionRemovedEvent(reader.getTopic());
deletedMessage = true;
} else if (RawBatchConverter.isReadableBatch(metadata)) {
try {
int numMessagesInBatch = metadata.getNumMessagesInBatch();
int deleteCnt = 0;
Expand Down Expand Up @@ -262,7 +266,10 @@ private void phaseTwoLoop(RawReader reader, MessageId to, Map<String, MessageId>
MessageId id = m.getMessageId();
Optional<RawMessage> messageToAdd = Optional.empty();
mxBean.addCompactionReadOp(reader.getTopic(), m.getHeadersAndPayload().readableBytes());
if (RawBatchConverter.isReadableBatch(m)) {
MessageMetadata metadata = Commands.parseMessageMetadata(m.getHeadersAndPayload());
if (Markers.isServerOnlyMarker(metadata)) {
messageToAdd = Optional.empty();
} else if (RawBatchConverter.isReadableBatch(metadata)) {
try {
messageToAdd = rebatchMessage(reader.getTopic(),
m, (key, subid) -> subid.equals(latestForKey.get(key)), topicCompactionRetainNullKey);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.service.persistent.ReplicatedSubscriptionsController;
import org.apache.pulsar.client.admin.LongRunningProcessStatus;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
Expand Down Expand Up @@ -822,6 +823,66 @@ public void testWriteMarkerTaskOfReplicateSubscriptions(boolean isTopicPolicyEna
pulsar1.getConfiguration().setForceDeleteNamespaceAllowed(false);
}

@Test
public void testReplicatedSubscriptionWithCompaction() throws Exception {
final String namespace = BrokerTestUtil.newUniqueName("pulsar/replicatedsubscription");
final String topicName = "persistent://" + namespace + "/testReplicatedSubscriptionWithCompaction";
final String subName = "sub";

admin1.namespaces().createNamespace(namespace);
admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1", "r2"));
admin1.topics().createNonPartitionedTopic(topicName);
admin1.topicPolicies().setCompactionThreshold(topicName, 100 * 1024 * 1024L);

@Cleanup final PulsarClient client = PulsarClient.builder().serviceUrl(url1.toString())
.statsInterval(0, TimeUnit.SECONDS).build();

Producer<String> producer = client.newProducer(Schema.STRING).topic(topicName).create();
producer.newMessage().key("K1").value("V1").send();
producer.newMessage().key("K1").value("V2").send();
producer.close();

createReplicatedSubscription(client, topicName, subName, true);
Awaitility.await().untilAsserted(() -> {
Map<String, Boolean> status = admin1.topics().getReplicatedSubscriptionStatus(topicName, subName);
assertTrue(status.get(topicName));
});

Awaitility.await().untilAsserted(() -> {
PersistentTopic t1 = (PersistentTopic) pulsar1.getBrokerService()
.getTopic(topicName, false).get().get();
ReplicatedSubscriptionsController rsc1 = t1.getReplicatedSubscriptionController().get();
Assert.assertTrue(rsc1.getLastCompletedSnapshotId().isPresent());
assertEquals(t1.getPendingWriteOps().get(), 0L);
});

admin1.topics().triggerCompaction(topicName);

Awaitility.await().untilAsserted(() -> {
assertEquals(admin1.topics().compactionStatus(topicName).status,
LongRunningProcessStatus.Status.SUCCESS);
});

@Cleanup
Consumer<String> consumer = client.newConsumer(Schema.STRING)
.topic(topicName)
.subscriptionName("sub2")
.subscriptionType(SubscriptionType.Exclusive)
.readCompacted(true)
.subscribe();
List<String> result = new ArrayList<>();
while (true) {
Message<String> receive = consumer.receive(2, TimeUnit.SECONDS);
if (receive == null) {
break;
}

result.add(receive.getValue());
}

Assert.assertEquals(result, List.of("V2"));
}

/**
* Disable replication subscription.
* Test scheduled task case.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@
import org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStore;
import org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStoreProvider;
import org.apache.pulsar.broker.transaction.pendingack.impl.PendingAckHandleImpl;
import org.apache.pulsar.client.admin.LongRunningProcessStatus;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
Expand Down Expand Up @@ -1849,4 +1850,62 @@ public void testReadCommittedWithReadCompacted() throws Exception{
Assert.assertEquals(messages, List.of("V2", "V3"));
}


@Test
public void testReadCommittedWithCompaction() throws Exception{
final String namespace = "tnx/ns-prechecks";
final String topic = "persistent://" + namespace + "/test_transaction_topic" + UUID.randomUUID();
admin.namespaces().createNamespace(namespace);
admin.topics().createNonPartitionedTopic(topic);

admin.topicPolicies().setCompactionThreshold(topic, 100 * 1024 * 1024);

@Cleanup
Producer<String> producer = this.pulsarClient.newProducer(Schema.STRING)
.topic(topic)
.create();

producer.newMessage().key("K1").value("V1").send();

Transaction txn = pulsarClient.newTransaction()
.withTransactionTimeout(1, TimeUnit.MINUTES).build().get();
producer.newMessage(txn).key("K2").value("V2").send();
producer.newMessage(txn).key("K3").value("V3").send();
txn.commit().get();

producer.newMessage().key("K1").value("V4").send();

Transaction txn2 = pulsarClient.newTransaction()
.withTransactionTimeout(1, TimeUnit.MINUTES).build().get();
producer.newMessage(txn2).key("K2").value("V5").send();
producer.newMessage(txn2).key("K3").value("V6").send();
txn2.commit().get();

admin.topics().triggerCompaction(topic);

Awaitility.await().untilAsserted(() -> {
assertEquals(admin.topics().compactionStatus(topic).status,
LongRunningProcessStatus.Status.SUCCESS);
});

@Cleanup
Consumer<String> consumer = this.pulsarClient.newConsumer(Schema.STRING)
.topic(topic)
.subscriptionName("sub")
.subscriptionType(SubscriptionType.Exclusive)
.readCompacted(true)
.subscribe();
List<String> result = new ArrayList<>();
while (true) {
Message<String> receive = consumer.receive(2, TimeUnit.SECONDS);
if (receive == null) {
break;
}

result.add(receive.getValue());
}

Assert.assertEquals(result, List.of("V4", "V5", "V6"));
}

}

0 comments on commit fc393f6

Please sign in to comment.