From 59f0c0bc4f1c2b7bafbc74f7f4e31fd485d3e8e7 Mon Sep 17 00:00:00 2001 From: Cong Zhao Date: Mon, 4 Sep 2023 17:04:51 +0800 Subject: [PATCH] [improve][broker] Make read compacted entries support maxReadSizeBytes limitation (#21065) (cherry picked from commit 835e9b60f0a0f94aa9fa641a2a33d4719391897b) --- .../pulsar/compaction/TwoPhaseCompactor.java | 13 +++- .../pulsar/compaction/CompactionTest.java | 66 +++++++++++++++++++ 2 files changed, 76 insertions(+), 3 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java index 886a523df02570..be08bd81c1e49c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java @@ -207,7 +207,7 @@ private CompletableFuture phaseTwoSeekThenLoop(RawReader reader, MessageId reader.seekAsync(from).thenCompose((v) -> { Semaphore outstanding = new Semaphore(MAX_OUTSTANDING); CompletableFuture loopPromise = new CompletableFuture<>(); - phaseTwoLoop(reader, to, latestForKey, ledger, outstanding, loopPromise); + phaseTwoLoop(reader, to, latestForKey, ledger, outstanding, loopPromise, MessageId.earliest); return loopPromise; }).thenCompose((v) -> closeLedger(ledger)) .thenCompose((v) -> reader.acknowledgeCumulativeAsync(lastReadId, @@ -229,7 +229,8 @@ private CompletableFuture phaseTwoSeekThenLoop(RawReader reader, MessageId } private void phaseTwoLoop(RawReader reader, MessageId to, Map latestForKey, - LedgerHandle lh, Semaphore outstanding, CompletableFuture promise) { + LedgerHandle lh, Semaphore outstanding, CompletableFuture promise, + MessageId lastCompactedMessageId) { if (promise.isDone()) { return; } @@ -238,6 +239,12 @@ private void phaseTwoLoop(RawReader reader, MessageId to, Map m.close(); return; } + + if (m.getMessageId().compareTo(lastCompactedMessageId) <= 0) { + phaseTwoLoop(reader, to, latestForKey, lh, outstanding, promise, lastCompactedMessageId); + return; + } + try { MessageId id = m.getMessageId(); Optional messageToAdd = Optional.empty(); @@ -308,7 +315,7 @@ private void phaseTwoLoop(RawReader reader, MessageId to, Map } return; } - phaseTwoLoop(reader, to, latestForKey, lh, outstanding, promise); + phaseTwoLoop(reader, to, latestForKey, lh, outstanding, promise, m.getMessageId()); } finally { m.close(); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java index c5dbd9c49aac91..24ca4e656ce89c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java @@ -1877,4 +1877,70 @@ public void testReceiverQueueSize() throws Exception { consumer.close(); producer.close(); } + + @Test + public void testCompactionDuplicate() throws Exception { + String topic = "persistent://my-property/use/my-ns/testCompactionDuplicate"; + final int numMessages = 1000; + final int maxKeys = 800; + + @Cleanup + Producer producer = pulsarClient.newProducer() + .topic(topic) + .enableBatching(false) + .messageRoutingMode(MessageRoutingMode.SinglePartition) + .create(); + + // trigger compaction (create __compaction cursor) + admin.topics().triggerCompaction(topic); + + Map expected = new HashMap<>(); + Random r = new Random(0); + + pulsarClient.newConsumer().topic(topic).subscriptionName("sub1").readCompacted(true).subscribe().close(); + + for (int j = 0; j < numMessages; j++) { + int keyIndex = r.nextInt(maxKeys); + String key = "key" + keyIndex; + byte[] data = ("my-message-" + key + "-" + j).getBytes(); + producer.newMessage().key(key).value(data).send(); + expected.put(key, data); + } + + producer.flush(); + + // trigger compaction + admin.topics().triggerCompaction(topic); + + Awaitility.await().untilAsserted(() -> { + assertEquals(admin.topics().compactionStatus(topic).status, + LongRunningProcessStatus.Status.RUNNING); + }); + + // Wait for phase one to complete + Thread.sleep(500); + + // Unload topic make reader of compaction reconnect + admin.topics().unload(topic); + + Awaitility.await().untilAsserted(() -> { + PersistentTopicInternalStats internalStats = admin.topics().getInternalStats(topic, false); + // Compacted topic ledger should have same number of entry equals to number of unique key. + Assert.assertEquals(internalStats.compactedLedger.entries, expected.size()); + Assert.assertTrue(internalStats.compactedLedger.ledgerId > -1); + Assert.assertFalse(internalStats.compactedLedger.offloaded); + }); + + // consumer with readCompacted enabled only get compacted entries + try (Consumer consumer = pulsarClient.newConsumer().topic(topic).subscriptionName("sub1") + .readCompacted(true).subscribe()) { + while (true) { + Message m = consumer.receive(2, TimeUnit.SECONDS); + Assert.assertEquals(expected.remove(m.getKey()), m.getData()); + if (expected.isEmpty()) { + break; + } + } + } + } }