Skip to content

Commit

Permalink
Add test & fix code
Browse files Browse the repository at this point in the history
  • Loading branch information
coderzc committed Dec 19, 2023
1 parent 55d929c commit 3ce2f31
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,9 @@ public int filterEntriesForConsumer(@Nullable MessageMetadata[] metadataArray, i
// Message metadata was corrupted or the messages was a server-only marker

if (Markers.isReplicatedSubscriptionSnapshotMarker(msgMetadata)) {
final int readerIndex = metadataAndPayload.readerIndex();
processReplicatedSubscriptionSnapshot(pos, metadataAndPayload);
metadataAndPayload.readerIndex(readerIndex);
}

// Deliver marker to __compaction cursor to avoid compaction task stuck,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.service.persistent.ReplicatedSubscriptionsController;
import org.apache.pulsar.client.admin.LongRunningProcessStatus;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
Expand Down Expand Up @@ -822,6 +823,66 @@ public void testWriteMarkerTaskOfReplicateSubscriptions(boolean isTopicPolicyEna
pulsar1.getConfiguration().setForceDeleteNamespaceAllowed(false);
}

@Test
public void testReplicatedSubscriptionWithCompaction() throws Exception {
final String namespace = BrokerTestUtil.newUniqueName("pulsar/replicatedsubscription");
final String topicName = "persistent://" + namespace + "/testReplicatedSubscriptionWithCompaction";
final String subName = "sub";

admin1.namespaces().createNamespace(namespace);
admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1", "r2"));
admin1.topics().createNonPartitionedTopic(topicName);
admin1.topicPolicies().setCompactionThreshold(topicName, 100 * 1024 * 1024L);

@Cleanup final PulsarClient client = PulsarClient.builder().serviceUrl(url1.toString())
.statsInterval(0, TimeUnit.SECONDS).build();

Producer<String> producer = client.newProducer(Schema.STRING).topic(topicName).create();
producer.newMessage().key("K1").value("V1").send();
producer.newMessage().key("K1").value("V2").send();
producer.close();

createReplicatedSubscription(client, topicName, subName, true);
Awaitility.await().untilAsserted(() -> {
Map<String, Boolean> status = admin1.topics().getReplicatedSubscriptionStatus(topicName, subName);
assertTrue(status.get(topicName));
});

Awaitility.await().untilAsserted(() -> {
PersistentTopic t1 = (PersistentTopic) pulsar1.getBrokerService()
.getTopic(topicName, false).get().get();
ReplicatedSubscriptionsController rsc1 = t1.getReplicatedSubscriptionController().get();
Assert.assertTrue(rsc1.getLastCompletedSnapshotId().isPresent());
assertEquals(t1.getPendingWriteOps().get(), 0L);
});

admin1.topics().triggerCompaction(topicName);

Awaitility.await().untilAsserted(() -> {
assertEquals(admin1.topics().compactionStatus(topicName).status,
LongRunningProcessStatus.Status.SUCCESS);
});

@Cleanup
Consumer<String> consumer = client.newConsumer(Schema.STRING)
.topic(topicName)
.subscriptionName("sub2")
.subscriptionType(SubscriptionType.Exclusive)
.readCompacted(true)
.subscribe();
List<String> result = new ArrayList<>();
while (true) {
Message<String> receive = consumer.receive(2, TimeUnit.SECONDS);
if (receive == null) {
break;
}

result.add(receive.getValue());
}

Assert.assertEquals(result, List.of("V2"));
}

/**
* Disable replication subscription.
* Test scheduled task case.
Expand Down

0 comments on commit 3ce2f31

Please sign in to comment.