diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index 3631feccca946f..4500d277852403 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -3506,7 +3506,7 @@ public ManagedCursorMXBean getStats() { return this.mbean; } - void updateReadStats(int readEntriesCount, long readEntriesSize) { + public void updateReadStats(int readEntriesCount, long readEntriesSize) { this.entriesReadCount += readEntriesCount; this.entriesReadSize += readEntriesSize; } @@ -3538,7 +3538,7 @@ public void markDeleteFailed(ManagedLedgerException exception, Object ctx) { }, null); } - private int applyMaxSizeCap(int maxEntries, long maxSizeBytes) { + public int applyMaxSizeCap(int maxEntries, long maxSizeBytes) { if (maxSizeBytes == NO_MAX_SIZE_LIMIT) { return maxEntries; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java index 7cbf7bd2c787af..67265f626a19ba 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java @@ -347,7 +347,7 @@ protected void readMoreEntries(Consumer consumer) { } havePendingRead = true; if (consumer.readCompacted()) { - topic.getCompactedTopic().asyncReadEntriesOrWait(cursor, messagesToRead, isFirstRead, + topic.getCompactedTopic().asyncReadEntriesOrWait(cursor, messagesToRead, bytesToRead, isFirstRead, this, consumer); } else { ReadEntriesCtx readEntriesCtx = diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherSingleActiveConsumer.java index efe9de778a3e7e..22dcc48994c1dd 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherSingleActiveConsumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherSingleActiveConsumer.java @@ -217,7 +217,7 @@ protected void readMoreEntries(Consumer consumer) { havePendingRead = true; if (consumer.readCompacted()) { - topic.getCompactedTopic().asyncReadEntriesOrWait(cursor, messagesToRead, isFirstRead, + topic.getCompactedTopic().asyncReadEntriesOrWait(cursor, messagesToRead, bytesToRead, isFirstRead, this, consumer); } else { streamingEntryReader.asyncReadEntries(messagesToRead, bytesToRead, consumer); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopic.java index e1a10b3bbb2121..c575a872fd60a0 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopic.java @@ -30,7 +30,8 @@ public interface CompactedTopic { CompletableFuture newCompactedLedger(Position p, long compactedLedgerId); CompletableFuture deleteCompactedLedger(long compactedLedgerId); void asyncReadEntriesOrWait(ManagedCursor cursor, - int numberOfEntriesToRead, + int maxEntries, + long bytesToRead, boolean isFirstRead, ReadEntriesCallback callback, Consumer consumer); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java index a0c45aa750f7a9..703ba688d3bdc2 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java @@ -42,6 +42,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.impl.EntryImpl; +import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl; import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.pulsar.broker.service.Consumer; import org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer.ReadEntriesCtx; @@ -87,7 +88,8 @@ public CompletableFuture deleteCompactedLedger(long compactedLedgerId) { @Override public void asyncReadEntriesOrWait(ManagedCursor cursor, - int numberOfEntriesToRead, + int maxEntries, + long bytesToRead, boolean isFirstRead, ReadEntriesCallback callback, Consumer consumer) { synchronized (this) { @@ -102,8 +104,11 @@ public void asyncReadEntriesOrWait(ManagedCursor cursor, ReadEntriesCtx readEntriesCtx = ReadEntriesCtx.create(consumer, DEFAULT_CONSUMER_EPOCH); if (compactionHorizon == null || compactionHorizon.compareTo(cursorPosition) < 0) { - cursor.asyncReadEntriesOrWait(numberOfEntriesToRead, callback, readEntriesCtx, PositionImpl.LATEST); + cursor.asyncReadEntriesOrWait(maxEntries, bytesToRead, callback, readEntriesCtx, PositionImpl.LATEST); } else { + ManagedCursorImpl managedCursor = (ManagedCursorImpl) cursor; + int numberOfEntriesToRead = managedCursor.applyMaxSizeCap(maxEntries, bytesToRead); + compactedTopicContext.thenCompose( (context) -> findStartPoint(cursorPosition, context.ledger.getLastAddConfirmed(), context.cache) .thenCompose((startPoint) -> { @@ -128,6 +133,12 @@ public void asyncReadEntriesOrWait(ManagedCursor cursor, } return readEntries(context.ledger, startPoint, endPoint) .thenAccept((entries) -> { + long entriesSize = 0; + for (Entry entry : entries) { + entriesSize += entry.getLength(); + } + managedCursor.updateReadStats(entries.size(), entriesSize); + Entry lastEntry = entries.get(entries.size() - 1); // The compaction task depends on the last snapshot and the incremental // entries to build the new snapshot. So for the compaction cursor, we diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java index 535d6715a3055a..3985069c6eba8d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java @@ -25,6 +25,7 @@ import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertNull; +import static org.testng.Assert.assertSame; import static org.testng.Assert.assertTrue; import com.google.common.collect.Sets; import com.google.common.util.concurrent.ThreadFactoryBuilder; @@ -55,11 +56,14 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.ManagedLedgerInfo; import org.apache.bookkeeper.mledger.Position; +import org.apache.commons.lang3.reflect.FieldUtils; import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.broker.namespace.NamespaceService; import org.apache.pulsar.broker.service.Topic; +import org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer; +import org.apache.pulsar.broker.service.persistent.PersistentSubscription; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.broker.service.persistent.SystemTopic; import org.apache.pulsar.client.admin.LongRunningProcessStatus; @@ -89,6 +93,7 @@ import org.apache.pulsar.common.protocol.Markers; import org.apache.pulsar.common.util.FutureUtil; import org.awaitility.Awaitility; +import org.mockito.Mockito; import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; @@ -1871,6 +1876,57 @@ public void testReceiverQueueSize() throws Exception { producer.close(); } + @Test + public void testDispatcherMaxReadSizeBytes() throws Exception { + final String topicName = + "persistent://my-property/use/my-ns/testDispatcherMaxReadSizeBytes" + UUID.randomUUID(); + final String subName = "my-sub"; + final int receiveQueueSize = 1; + @Cleanup + PulsarClient client = newPulsarClient(lookupUrl.toString(), 100); + Producer producer = pulsarClient.newProducer(Schema.BYTES) + .topic(topicName).create(); + + for (int i = 0; i < 10; i+=2) { + producer.newMessage().key(null).value(new byte[4*1024*1024]).send(); + } + producer.flush(); + + admin.topics().triggerCompaction(topicName); + + Awaitility.await().untilAsserted(() -> { + assertEquals(admin.topics().compactionStatus(topicName).status, + LongRunningProcessStatus.Status.SUCCESS); + }); + + admin.topics().unload(topicName); + + ConsumerImpl consumer = (ConsumerImpl) client.newConsumer(Schema.BYTES) + .topic(topicName).readCompacted(true).receiverQueueSize(receiveQueueSize).subscriptionName(subName) + .subscribe(); + + PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get(); + PersistentSubscription persistentSubscription = topic.getSubscriptions().get(subName); + PersistentDispatcherSingleActiveConsumer dispatcher = + Mockito.spy((PersistentDispatcherSingleActiveConsumer) persistentSubscription.getDispatcher()); + FieldUtils.writeDeclaredField(persistentSubscription, "dispatcher", dispatcher, true); + + Awaitility.await().untilAsserted(() -> { + assertSame(consumer.getStats().getMsgNumInReceiverQueue(), 1); + }); + + consumer.increaseAvailablePermits(2); + + Thread.sleep(2000); + + Mockito.verify(dispatcher, Mockito.atLeastOnce()) + .readEntriesComplete(Mockito.argThat(argument -> argument.size() == 1), + Mockito.any(PersistentDispatcherSingleActiveConsumer.ReadEntriesCtx.class)); + + consumer.close(); + producer.close(); + } + @Test public void testCompactionDuplicate() throws Exception { String topic = "persistent://my-property/use/my-ns/testCompactionDuplicate"; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/StrategicCompactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/StrategicCompactionTest.java index 54563431052eb7..a6e3108b2baa79 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/StrategicCompactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/StrategicCompactionTest.java @@ -213,4 +213,25 @@ public void testSameBatchCompactToSameBatch() throws Exception { } } + + @Override + public void testCompactCompressedBatching() throws Exception { + compactor = new StrategicTwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler, 10); + super.testCompactCompressedBatching(); + compactor = new StrategicTwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler, 1); + } + + @Override + public void testCompactEncryptedAndCompressedBatching() throws Exception { + compactor = new StrategicTwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler, 10); + super.testCompactEncryptedAndCompressedBatching(); + compactor = new StrategicTwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler, 1); + } + + @Override + public void testCompactEncryptedBatching() throws Exception { + compactor = new StrategicTwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler, 10); + super.testCompactEncryptedBatching(); + compactor = new StrategicTwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler, 1); + } }