Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix] [broker] Subscription stuck due to called Admin API analyzeSubscriptionBacklog #22019

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -196,11 +196,11 @@ public class ManagedCursorImpl implements ManagedCursor {
position.ackSet = null;
return position;
};
private final RangeSetWrapper<PositionImpl> individualDeletedMessages;
protected final RangeSetWrapper<PositionImpl> individualDeletedMessages;

// Maintain the deletion status for batch messages
// (ledgerId, entryId) -> deletion indexes
private final ConcurrentSkipListMap<PositionImpl, BitSetRecyclable> batchDeletedIndexes;
protected final ConcurrentSkipListMap<PositionImpl, BitSetRecyclable> batchDeletedIndexes;
private final ReadWriteLock lock = new ReentrantReadWriteLock();

private RateLimiter markDeleteLimiter;
Expand Down Expand Up @@ -3622,4 +3622,29 @@ public boolean isCacheReadEntry() {
public ManagedLedgerConfig getConfig() {
return config;
}

/***
* Create a non-durable cursor and copy the ack stats.
*/
public ManagedCursor duplicateToNonDurableCursor(String nonDurableCursorName) throws ManagedLedgerException {
NonDurableCursorImpl newNonDurableCursor =
(NonDurableCursorImpl) ledger.newNonDurableCursor(getMarkDeletedPosition(), nonDurableCursorName);
if (individualDeletedMessages != null) {
this.individualDeletedMessages.forEach(range -> {
newNonDurableCursor.individualDeletedMessages.addOpenClosed(
range.lowerEndpoint().getLedgerId(),
range.lowerEndpoint().getEntryId(),
range.upperEndpoint().getLedgerId(),
range.upperEndpoint().getEntryId());
return true;
});
}
if (batchDeletedIndexes != null) {
for (Map.Entry<PositionImpl, BitSetRecyclable> entry : this.batchDeletedIndexes.entrySet()) {
BitSetRecyclable copiedBitSet = BitSetRecyclable.valueOf(entry.getValue());
newNonDurableCursor.batchDeletedIndexes.put(entry.getKey(), copiedBitSet);
}
}
return newNonDurableCursor;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
Expand Down Expand Up @@ -530,9 +531,15 @@ public String getTypeString() {
return "Null";
}

@Override
public CompletableFuture<AnalyzeBacklogResult> analyzeBacklog(Optional<Position> position) {

final ManagedLedger managedLedger = topic.getManagedLedger();
final String newNonDurableCursorName = "analyze-backlog-" + UUID.randomUUID();
ManagedCursor newNonDurableCursor;
try {
gaoran10 marked this conversation as resolved.
Show resolved Hide resolved
newNonDurableCursor = ((ManagedCursorImpl) cursor).duplicateToNonDurableCursor(newNonDurableCursorName);
} catch (ManagedLedgerException e) {
return CompletableFuture.failedFuture(e);
}
long start = System.currentTimeMillis();
if (log.isDebugEnabled()) {
log.debug("[{}][{}] Starting to analyze backlog", topicName, subName);
Expand All @@ -547,7 +554,7 @@ public CompletableFuture<AnalyzeBacklogResult> analyzeBacklog(Optional<Position>
AtomicLong rejectedMessages = new AtomicLong();
AtomicLong rescheduledMessages = new AtomicLong();

Position currentPosition = cursor.getMarkDeletedPosition();
Position currentPosition = newNonDurableCursor.getMarkDeletedPosition();

if (log.isDebugEnabled()) {
log.debug("[{}][{}] currentPosition {}",
Expand Down Expand Up @@ -607,7 +614,7 @@ public CompletableFuture<AnalyzeBacklogResult> analyzeBacklog(Optional<Position>

return true;
};
return cursor.scan(
CompletableFuture<AnalyzeBacklogResult> res = newNonDurableCursor.scan(
position,
condition,
batchSize,
Expand All @@ -634,7 +641,22 @@ public CompletableFuture<AnalyzeBacklogResult> analyzeBacklog(Optional<Position>
topicName, subName, end - start, result);
return result;
});
res.whenComplete((__, ex) -> {
managedLedger.asyncDeleteCursor(newNonDurableCursorName,
new AsyncCallbacks.DeleteCursorCallback(){
@Override
public void deleteCursorComplete(Object ctx) {
// Nothing to do.
}

@Override
public void deleteCursorFailed(ManagedLedgerException exception, Object ctx) {
log.warn("[{}][{}] Delete non-durable cursor[{}] failed when analyze backlog.",
topicName, subName, newNonDurableCursor.getName());
}
}, null);
});
return res;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3389,4 +3389,33 @@ private void testSetBacklogQuotasNamespaceLevelIfRetentionExists() throws Except
// cleanup.
admin.namespaces().deleteNamespace(ns);
}

@Test
private void testAnalyzeSubscriptionBacklogNotCauseStuck() throws Exception {
final String topic = BrokerTestUtil.newUniqueName("persistent://" + defaultNamespace + "/tp");
final String subscription = "s1";
admin.topics().createNonPartitionedTopic(topic);
// Send 10 messages.
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING).topic(topic).subscriptionName(subscription)
.receiverQueueSize(0).subscribe();
Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topic).create();
for (int i = 0; i < 10; i++) {
producer.send(i + "");
}

// Verify consumer can receive all messages after calling "analyzeSubscriptionBacklog".
admin.topics().analyzeSubscriptionBacklog(topic, subscription, Optional.of(MessageIdImpl.earliest));
for (int i = 0; i < 10; i++) {
Awaitility.await().untilAsserted(() -> {
Message m = consumer.receive();
assertNotNull(m);
consumer.acknowledge(m);
});
}

// cleanup.
consumer.close();
producer.close();
admin.topics().delete(topic);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -154,17 +154,17 @@ private void verifyBacklog(String topic, String subscription, int numEntries, in
AnalyzeSubscriptionBacklogResult analyzeSubscriptionBacklogResult
= admin.topics().analyzeSubscriptionBacklog(topic, subscription, Optional.empty());

assertEquals(numEntries, analyzeSubscriptionBacklogResult.getEntries());
assertEquals(numEntries, analyzeSubscriptionBacklogResult.getFilterAcceptedEntries());
assertEquals(0, analyzeSubscriptionBacklogResult.getFilterRejectedEntries());
assertEquals(0, analyzeSubscriptionBacklogResult.getFilterRescheduledEntries());
assertEquals(0, analyzeSubscriptionBacklogResult.getFilterRescheduledEntries());
assertEquals(analyzeSubscriptionBacklogResult.getEntries(), numEntries);
assertEquals(analyzeSubscriptionBacklogResult.getFilterAcceptedEntries(), numEntries);
assertEquals(analyzeSubscriptionBacklogResult.getFilterRejectedEntries(), 0);
assertEquals(analyzeSubscriptionBacklogResult.getFilterRescheduledEntries(), 0);
assertEquals(analyzeSubscriptionBacklogResult.getFilterRescheduledEntries(), 0);

assertEquals(numMessages, analyzeSubscriptionBacklogResult.getMessages());
assertEquals(numMessages, analyzeSubscriptionBacklogResult.getFilterAcceptedMessages());
assertEquals(0, analyzeSubscriptionBacklogResult.getFilterRejectedMessages());
assertEquals(analyzeSubscriptionBacklogResult.getMessages(), numMessages);
assertEquals(analyzeSubscriptionBacklogResult.getFilterAcceptedMessages(), numMessages);
assertEquals(analyzeSubscriptionBacklogResult.getFilterRejectedMessages(), 0);

assertEquals(0, analyzeSubscriptionBacklogResult.getFilterRescheduledMessages());
assertEquals(analyzeSubscriptionBacklogResult.getFilterRescheduledMessages(), 0);
assertFalse(analyzeSubscriptionBacklogResult.isAborted());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,14 @@ public static BitSetRecyclable valueOf(byte[] bytes) {
return BitSetRecyclable.valueOf(ByteBuffer.wrap(bytes));
}

/**
* Copy a BitSetRecyclable.
*/
public static BitSetRecyclable valueOf(BitSetRecyclable src) {
// The internal implementation will do the array-copy.
return valueOf(src.words);
}

/**
* Returns a new bit set containing all the bits in the given byte
* buffer between its position and limit.
Expand Down
Loading