Skip to content

Commit

Permalink
[improve][broker] Make read compacted entries support maxReadSizeByte…
Browse files Browse the repository at this point in the history
…s limitation (apache#21065)

(cherry picked from commit 835e9b6)
  • Loading branch information
coderzc committed Sep 18, 2023
1 parent 3a30526 commit 79139dd
Show file tree
Hide file tree
Showing 7 changed files with 96 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3506,7 +3506,7 @@ public ManagedCursorMXBean getStats() {
return this.mbean;
}

void updateReadStats(int readEntriesCount, long readEntriesSize) {
public void updateReadStats(int readEntriesCount, long readEntriesSize) {
this.entriesReadCount += readEntriesCount;
this.entriesReadSize += readEntriesSize;
}
Expand Down Expand Up @@ -3538,7 +3538,7 @@ public void markDeleteFailed(ManagedLedgerException exception, Object ctx) {
}, null);
}

private int applyMaxSizeCap(int maxEntries, long maxSizeBytes) {
public int applyMaxSizeCap(int maxEntries, long maxSizeBytes) {
if (maxSizeBytes == NO_MAX_SIZE_LIMIT) {
return maxEntries;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ protected void readMoreEntries(Consumer consumer) {
}
havePendingRead = true;
if (consumer.readCompacted()) {
topic.getCompactedTopic().asyncReadEntriesOrWait(cursor, messagesToRead, isFirstRead,
topic.getCompactedTopic().asyncReadEntriesOrWait(cursor, messagesToRead, bytesToRead, isFirstRead,
this, consumer);
} else {
ReadEntriesCtx readEntriesCtx =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ protected void readMoreEntries(Consumer consumer) {
havePendingRead = true;

if (consumer.readCompacted()) {
topic.getCompactedTopic().asyncReadEntriesOrWait(cursor, messagesToRead, isFirstRead,
topic.getCompactedTopic().asyncReadEntriesOrWait(cursor, messagesToRead, bytesToRead, isFirstRead,
this, consumer);
} else {
streamingEntryReader.asyncReadEntries(messagesToRead, bytesToRead, consumer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ public interface CompactedTopic {
CompletableFuture<CompactedTopicContext> newCompactedLedger(Position p, long compactedLedgerId);
CompletableFuture<Void> deleteCompactedLedger(long compactedLedgerId);
void asyncReadEntriesOrWait(ManagedCursor cursor,
int numberOfEntriesToRead,
int maxEntries,
long bytesToRead,
boolean isFirstRead,
ReadEntriesCallback callback,
Consumer consumer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.EntryImpl;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer.ReadEntriesCtx;
Expand Down Expand Up @@ -87,7 +88,8 @@ public CompletableFuture<Void> deleteCompactedLedger(long compactedLedgerId) {

@Override
public void asyncReadEntriesOrWait(ManagedCursor cursor,
int numberOfEntriesToRead,
int maxEntries,
long bytesToRead,
boolean isFirstRead,
ReadEntriesCallback callback, Consumer consumer) {
synchronized (this) {
Expand All @@ -102,8 +104,11 @@ public void asyncReadEntriesOrWait(ManagedCursor cursor,
ReadEntriesCtx readEntriesCtx = ReadEntriesCtx.create(consumer, DEFAULT_CONSUMER_EPOCH);
if (compactionHorizon == null
|| compactionHorizon.compareTo(cursorPosition) < 0) {
cursor.asyncReadEntriesOrWait(numberOfEntriesToRead, callback, readEntriesCtx, PositionImpl.LATEST);
cursor.asyncReadEntriesOrWait(maxEntries, bytesToRead, callback, readEntriesCtx, PositionImpl.LATEST);
} else {
ManagedCursorImpl managedCursor = (ManagedCursorImpl) cursor;
int numberOfEntriesToRead = managedCursor.applyMaxSizeCap(maxEntries, bytesToRead);

compactedTopicContext.thenCompose(
(context) -> findStartPoint(cursorPosition, context.ledger.getLastAddConfirmed(), context.cache)
.thenCompose((startPoint) -> {
Expand All @@ -128,6 +133,12 @@ public void asyncReadEntriesOrWait(ManagedCursor cursor,
}
return readEntries(context.ledger, startPoint, endPoint)
.thenAccept((entries) -> {
long entriesSize = 0;
for (Entry entry : entries) {
entriesSize += entry.getLength();
}
managedCursor.updateReadStats(entries.size(), entriesSize);

Entry lastEntry = entries.get(entries.size() - 1);
// The compaction task depends on the last snapshot and the incremental
// entries to build the new snapshot. So for the compaction cursor, we
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertSame;
import static org.testng.Assert.assertTrue;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
Expand Down Expand Up @@ -55,11 +56,14 @@
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerInfo;
import org.apache.bookkeeper.mledger.Position;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.service.persistent.SystemTopic;
import org.apache.pulsar.client.admin.LongRunningProcessStatus;
Expand Down Expand Up @@ -89,6 +93,7 @@
import org.apache.pulsar.common.protocol.Markers;
import org.apache.pulsar.common.util.FutureUtil;
import org.awaitility.Awaitility;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
Expand Down Expand Up @@ -1871,6 +1876,57 @@ public void testReceiverQueueSize() throws Exception {
producer.close();
}

@Test
public void testDispatcherMaxReadSizeBytes() throws Exception {
final String topicName =
"persistent://my-property/use/my-ns/testDispatcherMaxReadSizeBytes" + UUID.randomUUID();
final String subName = "my-sub";
final int receiveQueueSize = 1;
@Cleanup
PulsarClient client = newPulsarClient(lookupUrl.toString(), 100);
Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
.topic(topicName).create();

for (int i = 0; i < 10; i+=2) {
producer.newMessage().key(null).value(new byte[4*1024*1024]).send();
}
producer.flush();

admin.topics().triggerCompaction(topicName);

Awaitility.await().untilAsserted(() -> {
assertEquals(admin.topics().compactionStatus(topicName).status,
LongRunningProcessStatus.Status.SUCCESS);
});

admin.topics().unload(topicName);

ConsumerImpl<byte[]> consumer = (ConsumerImpl<byte[]>) client.newConsumer(Schema.BYTES)
.topic(topicName).readCompacted(true).receiverQueueSize(receiveQueueSize).subscriptionName(subName)
.subscribe();

PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get();
PersistentSubscription persistentSubscription = topic.getSubscriptions().get(subName);
PersistentDispatcherSingleActiveConsumer dispatcher =
Mockito.spy((PersistentDispatcherSingleActiveConsumer) persistentSubscription.getDispatcher());
FieldUtils.writeDeclaredField(persistentSubscription, "dispatcher", dispatcher, true);

Awaitility.await().untilAsserted(() -> {
assertSame(consumer.getStats().getMsgNumInReceiverQueue(), 1);
});

consumer.increaseAvailablePermits(2);

Thread.sleep(2000);

Mockito.verify(dispatcher, Mockito.atLeastOnce())
.readEntriesComplete(Mockito.argThat(argument -> argument.size() == 1),
Mockito.any(PersistentDispatcherSingleActiveConsumer.ReadEntriesCtx.class));

consumer.close();
producer.close();
}

@Test
public void testCompactionDuplicate() throws Exception {
String topic = "persistent://my-property/use/my-ns/testCompactionDuplicate";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,4 +213,25 @@ public void testSameBatchCompactToSameBatch() throws Exception {
}

}

@Override
public void testCompactCompressedBatching() throws Exception {
compactor = new StrategicTwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler, 10);
super.testCompactCompressedBatching();
compactor = new StrategicTwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler, 1);
}

@Override
public void testCompactEncryptedAndCompressedBatching() throws Exception {
compactor = new StrategicTwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler, 10);
super.testCompactEncryptedAndCompressedBatching();
compactor = new StrategicTwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler, 1);
}

@Override
public void testCompactEncryptedBatching() throws Exception {
compactor = new StrategicTwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler, 10);
super.testCompactEncryptedBatching();
compactor = new StrategicTwoPhaseCompactor(conf, pulsarClient, bk, compactionScheduler, 1);
}
}

0 comments on commit 79139dd

Please sign in to comment.