Skip to content
This repository has been archived by the owner on Apr 1, 2024. It is now read-only.

Commit

Permalink
[fix][broker] Fix incorrect number of read compacted entries (apache#…
Browse files Browse the repository at this point in the history
  • Loading branch information
coderzc committed Aug 14, 2023
1 parent 75d4d82 commit 63d9eaf
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ public void asyncReadEntriesOrWait(ManagedCursor cursor,
return CompletableFuture.completedFuture(null);
} else {
long endPoint = Math.min(context.ledger.getLastAddConfirmed(),
startPoint + numberOfEntriesToRead);
startPoint + (numberOfEntriesToRead - 1));
return readEntries(context.ledger, startPoint, endPoint)
.thenAccept((entries) -> {
Entry lastEntry = entries.get(entries.size() - 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public CompletableFuture<List<Entry>> readCompactedEntries(@Nonnull Position sta
return CompletableFuture.completedFuture(Collections.emptyList());
}
long endPoint =
Math.min(context.ledger.getLastAddConfirmed(), startPoint + numberOfEntriesToRead);
Math.min(context.ledger.getLastAddConfirmed(), startPoint + (numberOfEntriesToRead - 1));
return CompactedTopicImpl.readEntries(context.ledger, startPoint, endPoint);
})).whenComplete((result, ex) -> {
if (ex == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.service.persistent.SystemTopic;
import org.apache.pulsar.client.admin.LongRunningProcessStatus;
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.CryptoKeyReader;
Expand All @@ -70,11 +71,14 @@
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
Expand Down Expand Up @@ -1783,4 +1787,38 @@ public void addFailed(ManagedLedgerException exception, Object ctx) {
Assert.assertNotEquals(ledgerId, -1L);
});
}

@Test(timeOut = 100000)
public void testReceiverQueueSize() throws Exception {
final String topicName = "persistent://my-property/use/my-ns/testReceiverQueueSize" + UUID.randomUUID();
final String subName = "my-sub";
final int receiveQueueSize = 1;
@Cleanup
PulsarClient client = newPulsarClient(lookupUrl.toString(), 100);
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.enableBatching(false).topic(topicName).create();

for (int i = 0; i < 10; i++) {
producer.newMessage().key(String.valueOf(i % 2)).value(String.valueOf(i)).sendAsync();
}
producer.flush();

admin.topics().triggerCompaction(topicName);

Awaitility.await().untilAsserted(() -> {
assertEquals(admin.topics().compactionStatus(topicName).status,
LongRunningProcessStatus.Status.SUCCESS);
});

ConsumerImpl<String> consumer = (ConsumerImpl<String>) client.newConsumer(Schema.STRING)
.topic(topicName).readCompacted(true).receiverQueueSize(receiveQueueSize).subscriptionName(subName)
.subscribe();

//Give some time to consume
Awaitility.await()
.untilAsserted(() -> Assert.assertEquals(consumer.getStats().getMsgNumInReceiverQueue().intValue(),
receiveQueueSize));
consumer.close();
producer.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,19 +36,25 @@
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

public class TopicCompactionServiceTest extends CompactorTest {

@Test
public void test() throws PulsarClientException, PulsarAdminException {
@BeforeMethod
@Override
public void setup() throws Exception {
super.setup();
admin.clusters().createCluster("test", ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build());
TenantInfoImpl tenantInfo = new TenantInfoImpl(Set.of("role1", "role2"), Set.of("test"));
String defaultTenant = "prop-xyz";
admin.tenants().createTenant(defaultTenant, tenantInfo);
String defaultNamespace = defaultTenant + "/ns1";
admin.namespaces().createNamespace(defaultNamespace, Set.of("test"));
}

@Test
public void test() throws PulsarClientException, PulsarAdminException {
String topic = "persistent://prop-xyz/ns1/my-topic";

PulsarTopicCompactionService service = new PulsarTopicCompactionService(topic, bk, () -> compactor);
Expand Down Expand Up @@ -114,5 +120,8 @@ public void test() throws PulsarClientException, PulsarAdminException {
assertEquals(data, "B_3");
}
});

List<Entry> entries2 = service.readCompactedEntries(PositionImpl.EARLIEST, 1).join();
assertEquals(entries2.size(), 1);
}
}

0 comments on commit 63d9eaf

Please sign in to comment.