diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java index fd20c31d32fb8..07082eec6b949 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java @@ -119,7 +119,7 @@ public void asyncReadEntriesOrWait(ManagedCursor cursor, return CompletableFuture.completedFuture(null); } else { long endPoint = Math.min(context.ledger.getLastAddConfirmed(), - startPoint + numberOfEntriesToRead); + startPoint + (numberOfEntriesToRead - 1)); return readEntries(context.ledger, startPoint, endPoint) .thenAccept((entries) -> { Entry lastEntry = entries.get(entries.size() - 1); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/PulsarTopicCompactionService.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/PulsarTopicCompactionService.java index dd218c9be51a2..c420767d1e884 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/PulsarTopicCompactionService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/PulsarTopicCompactionService.java @@ -78,7 +78,7 @@ public CompletableFuture> readCompactedEntries(@Nonnull Position sta return CompletableFuture.completedFuture(Collections.emptyList()); } long endPoint = - Math.min(context.ledger.getLastAddConfirmed(), startPoint + numberOfEntriesToRead); + Math.min(context.ledger.getLastAddConfirmed(), startPoint + (numberOfEntriesToRead - 1)); return CompactedTopicImpl.readEntries(context.ledger, startPoint, endPoint); })).whenComplete((result, ex) -> { if (ex == null) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java index d11a2f87192ff..c8105b011254b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java @@ -61,6 +61,7 @@ import org.apache.pulsar.broker.service.Topic; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.broker.service.persistent.SystemTopic; +import org.apache.pulsar.client.admin.LongRunningProcessStatus; import org.apache.pulsar.client.api.CompressionType; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.CryptoKeyReader; @@ -70,11 +71,14 @@ import org.apache.pulsar.client.api.MessageRoutingMode; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.ProducerBuilder; +import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Reader; +import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionInitialPosition; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.impl.BatchMessageIdImpl; +import org.apache.pulsar.client.impl.ConsumerImpl; import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicName; @@ -1783,4 +1787,38 @@ public void addFailed(ManagedLedgerException exception, Object ctx) { Assert.assertNotEquals(ledgerId, -1L); }); } + + @Test(timeOut = 100000) + public void testReceiverQueueSize() throws Exception { + final String topicName = "persistent://my-property/use/my-ns/testReceiverQueueSize" + UUID.randomUUID(); + final String subName = "my-sub"; + final int receiveQueueSize = 1; + @Cleanup + PulsarClient client = newPulsarClient(lookupUrl.toString(), 100); + Producer producer = pulsarClient.newProducer(Schema.STRING) + .enableBatching(false).topic(topicName).create(); + + for (int i = 0; i < 10; i++) { + producer.newMessage().key(String.valueOf(i % 2)).value(String.valueOf(i)).sendAsync(); + } + producer.flush(); + + admin.topics().triggerCompaction(topicName); + + Awaitility.await().untilAsserted(() -> { + assertEquals(admin.topics().compactionStatus(topicName).status, + LongRunningProcessStatus.Status.SUCCESS); + }); + + ConsumerImpl consumer = (ConsumerImpl) client.newConsumer(Schema.STRING) + .topic(topicName).readCompacted(true).receiverQueueSize(receiveQueueSize).subscriptionName(subName) + .subscribe(); + + //Give some time to consume + Awaitility.await() + .untilAsserted(() -> Assert.assertEquals(consumer.getStats().getMsgNumInReceiverQueue().intValue(), + receiveQueueSize)); + consumer.close(); + producer.close(); + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/TopicCompactionServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/TopicCompactionServiceTest.java index 5810e0180d07c..b92e54a37bf7b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/TopicCompactionServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/TopicCompactionServiceTest.java @@ -36,19 +36,25 @@ import org.apache.pulsar.client.impl.MessageImpl; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.TenantInfoImpl; +import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; public class TopicCompactionServiceTest extends CompactorTest { - @Test - public void test() throws PulsarClientException, PulsarAdminException { + @BeforeMethod + @Override + public void setup() throws Exception { + super.setup(); admin.clusters().createCluster("test", ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build()); TenantInfoImpl tenantInfo = new TenantInfoImpl(Set.of("role1", "role2"), Set.of("test")); String defaultTenant = "prop-xyz"; admin.tenants().createTenant(defaultTenant, tenantInfo); String defaultNamespace = defaultTenant + "/ns1"; admin.namespaces().createNamespace(defaultNamespace, Set.of("test")); + } + @Test + public void test() throws PulsarClientException, PulsarAdminException { String topic = "persistent://prop-xyz/ns1/my-topic"; PulsarTopicCompactionService service = new PulsarTopicCompactionService(topic, bk, () -> compactor); @@ -114,5 +120,8 @@ public void test() throws PulsarClientException, PulsarAdminException { assertEquals(data, "B_3"); } }); + + List entries2 = service.readCompactedEntries(PositionImpl.EARLIEST, 1).join(); + assertEquals(entries2.size(), 1); } }