Skip to content

Commit

Permalink
[fix][broker] Fix NPE when reset Replicator's cursor by position. (ap…
Browse files Browse the repository at this point in the history
  • Loading branch information
lifepuzzlefun committed Jun 30, 2023
1 parent 8d6b931 commit 5abadbe
Show file tree
Hide file tree
Showing 6 changed files with 84 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4114,31 +4114,43 @@ private CompletableFuture<Void> internalExpireMessagesNonPartitionedTopicByPosit
return;
}
try {
PersistentSubscription sub = topic.getSubscription(subName);
if (sub == null) {
asyncResponse.resume(new RestException(Status.NOT_FOUND,
getSubNotFoundErrorMessage(topicName.toString(), subName)));
return;
PersistentSubscription sub = null;
PersistentReplicator repl = null;

if (subName.startsWith(topic.getReplicatorPrefix())) {
String remoteCluster = PersistentReplicator.getRemoteCluster(subName);
repl = (PersistentReplicator)
topic.getPersistentReplicator(remoteCluster);
if (repl == null) {
asyncResponse.resume(new RestException(Status.NOT_FOUND,
"Replicator not found"));
return;
}
} else {
sub = topic.getSubscription(subName);
if (sub == null) {
asyncResponse.resume(new RestException(Status.NOT_FOUND,
getSubNotFoundErrorMessage(topicName.toString(), subName)));
return;
}
}

CompletableFuture<Integer> batchSizeFuture = new CompletableFuture<>();
getEntryBatchSize(batchSizeFuture, topic, messageId, batchIndex);

PersistentReplicator finalRepl = repl;
PersistentSubscription finalSub = sub;

batchSizeFuture.thenAccept(bi -> {
PositionImpl position = calculatePositionAckSet(isExcluded, bi, batchIndex, messageId);
boolean issued;
try {
if (subName.startsWith(topic.getReplicatorPrefix())) {
String remoteCluster = PersistentReplicator.getRemoteCluster(subName);
PersistentReplicator repl = (PersistentReplicator)
topic.getPersistentReplicator(remoteCluster);
if (repl == null) {
asyncResponse.resume(new RestException(Status.NOT_FOUND,
"Replicator not found"));
return;
}
issued = repl.expireMessages(position);
issued = finalRepl.expireMessages(position);
} else {
issued = sub.expireMessages(position);
issued = finalSub.expireMessages(position);
}

if (issued) {
log.info("[{}] Message expire started up to {} on {} {}", clientAppId(), position,
topicName, subName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.Optional;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.LongAdder;
import javax.annotation.Nullable;
import org.apache.bookkeeper.mledger.AsyncCallbacks.FindEntryCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.MarkDeleteCallback;
import org.apache.bookkeeper.mledger.ManagedCursor;
Expand All @@ -43,6 +44,7 @@
public class PersistentMessageExpiryMonitor implements FindEntryCallback {
private final ManagedCursor cursor;
private final String subName;
private final PersistentTopic topic;
private final String topicName;
private final Rate msgExpired;
private final LongAdder totalMsgExpired;
Expand All @@ -57,9 +59,10 @@ public class PersistentMessageExpiryMonitor implements FindEntryCallback {
expirationCheckInProgressUpdater = AtomicIntegerFieldUpdater
.newUpdater(PersistentMessageExpiryMonitor.class, "expirationCheckInProgress");

public PersistentMessageExpiryMonitor(String topicName, String subscriptionName, ManagedCursor cursor,
PersistentSubscription subscription) {
this.topicName = topicName;
public PersistentMessageExpiryMonitor(PersistentTopic topic, String subscriptionName, ManagedCursor cursor,
@Nullable PersistentSubscription subscription) {
this.topic = topic;
this.topicName = topic.getName();
this.cursor = cursor;
this.subName = subscriptionName;
this.subscription = subscription;
Expand Down Expand Up @@ -98,11 +101,12 @@ public boolean expireMessages(int messageTTLInSeconds) {

public boolean expireMessages(Position messagePosition) {
// If it's beyond last position of this topic, do nothing.
if (((PositionImpl) subscription.getTopic().getLastPosition()).compareTo((PositionImpl) messagePosition) < 0) {
PositionImpl topicLastPosition = (PositionImpl) this.topic.getLastPosition();
if (topicLastPosition.compareTo((PositionImpl) messagePosition) < 0) {
if (log.isDebugEnabled()) {
log.debug("[{}][{}] Ignore expire-message scheduled task, given position {} is beyond "
+ "current topic's last position {}", topicName, subName, messagePosition,
subscription.getTopic().getLastPosition());
+ "current topic's last position {}", topicName, subName, messagePosition,
topicLastPosition);
}
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ public PersistentReplicator(String localCluster, PersistentTopic localTopic, Man
brokerService, replicationClient);
this.topic = localTopic;
this.cursor = cursor;
this.expiryMonitor = new PersistentMessageExpiryMonitor(localTopicName,
this.expiryMonitor = new PersistentMessageExpiryMonitor(localTopic,
Codec.decode(cursor.getName()), cursor, null);
HAVE_PENDING_READ_UPDATER.set(this, FALSE);
PENDING_MESSAGES_UPDATER.set(this, 0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ public PersistentSubscription(PersistentTopic topic, String subscriptionName, Ma
this.topicName = topic.getName();
this.subName = subscriptionName;
this.fullName = MoreObjects.toStringHelper(this).add("topic", topicName).add("name", subName).toString();
this.expiryMonitor = new PersistentMessageExpiryMonitor(topicName, subscriptionName, cursor, this);
this.expiryMonitor = new PersistentMessageExpiryMonitor(topic, subscriptionName, cursor, this);
this.setReplicated(replicated);
this.subscriptionProperties = MapUtils.isEmpty(subscriptionProperties)
? Collections.emptyMap() : Collections.unmodifiableMap(subscriptionProperties);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import org.apache.pulsar.broker.service.persistent.PersistentMessageExpiryMonitor;
import org.apache.pulsar.broker.service.persistent.PersistentMessageFinder;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.impl.ResetCursorData;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.api.proto.BrokerEntryMetadata;
Expand Down Expand Up @@ -230,7 +231,11 @@ public void findEntryFailed(ManagedLedgerException exception, Optional<Position>
});
assertTrue(ex.get());

PersistentMessageExpiryMonitor monitor = new PersistentMessageExpiryMonitor("topicname", c1.getName(), c1, null);
PersistentTopic mock = mock(PersistentTopic.class);
when(mock.getName()).thenReturn("topicname");
when(mock.getLastPosition()).thenReturn(PositionImpl.EARLIEST);

PersistentMessageExpiryMonitor monitor = new PersistentMessageExpiryMonitor(mock, c1.getName(), c1, null);
monitor.findEntryFailed(new ManagedLedgerException.ConcurrentFindCursorPositionException("failed"),
Optional.empty(), null);
Field field = monitor.getClass().getDeclaredField("expirationCheckInProgress");
Expand Down Expand Up @@ -407,7 +412,11 @@ void testMessageExpiryWithTimestampNonRecoverableException() throws Exception {
bkc.deleteLedger(ledgers.get(1).getLedgerId());
bkc.deleteLedger(ledgers.get(2).getLedgerId());

PersistentMessageExpiryMonitor monitor = new PersistentMessageExpiryMonitor("topicname", c1.getName(), c1, null);
PersistentTopic mock = mock(PersistentTopic.class);
when(mock.getName()).thenReturn("topicname");
when(mock.getLastPosition()).thenReturn(PositionImpl.EARLIEST);

PersistentMessageExpiryMonitor monitor = new PersistentMessageExpiryMonitor(mock, c1.getName(), c1, null);
Position previousMarkDelete = null;
for (int i = 0; i < totalEntries; i++) {
monitor.expireMessages(1);
Expand Down Expand Up @@ -444,15 +453,16 @@ void testMessageExpiryWithPosition() throws Exception {
ManagedCursorImpl cursor = (ManagedCursorImpl) ledger.openCursor(ledgerAndCursorName);

PersistentSubscription subscription = mock(PersistentSubscription.class);
Topic topic = mock(Topic.class);
PersistentTopic topic = mock(PersistentTopic.class);
when(subscription.getTopic()).thenReturn(topic);
when(topic.getName()).thenReturn("topicname");

for (int i = 0; i < totalEntries; i++) {
positions.add(ledger.addEntry(createMessageWrittenToLedger("msg" + i)));
}
when(topic.getLastPosition()).thenReturn(positions.get(positions.size() - 1));

PersistentMessageExpiryMonitor monitor = spy(new PersistentMessageExpiryMonitor("topicname",
PersistentMessageExpiryMonitor monitor = spy(new PersistentMessageExpiryMonitor(topic,
cursor.getName(), cursor, subscription));
assertEquals(cursor.getMarkDeletedPosition(), PositionImpl.get(positions.get(0).getLedgerId(), -1));
boolean issued;
Expand Down Expand Up @@ -491,7 +501,7 @@ void testMessageExpiryWithPosition() throws Exception {
clearInvocations(monitor);

ManagedCursorImpl mockCursor = mock(ManagedCursorImpl.class);
PersistentMessageExpiryMonitor mockMonitor = spy(new PersistentMessageExpiryMonitor("topicname",
PersistentMessageExpiryMonitor mockMonitor = spy(new PersistentMessageExpiryMonitor(topic,
cursor.getName(), mockCursor, subscription));
// Not calling findEntryComplete to clear expirationCheckInProgress condition, so following call to
// expire message shouldn't issue.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -728,6 +728,36 @@ public void testReplicatorClearBacklog() throws Exception {
assertEquals(status.getReplicationBacklog(), 0);
}


@Test(timeOut = 30000)
public void testResetReplicatorSubscriptionPosition() throws Exception {
final TopicName dest = TopicName
.get(BrokerTestUtil.newUniqueName("persistent://pulsar/ns/resetReplicatorSubscription"));

@Cleanup
MessageProducer producer1 = new MessageProducer(url1, dest);

// Produce from cluster1 and consume from the rest
for (int i = 0; i < 10; i++) {
producer1.produce(2);
}

PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService().getTopicReference(dest.toString()).get();

PersistentReplicator replicator = (PersistentReplicator) spy(
topic.getReplicators().get(topic.getReplicators().keys().get(0)));

MessageId id = topic.getLastMessageId().get();
admin1.topics().expireMessages(dest.getPartitionedTopicName(),
replicator.getCursor().getName(),
id,false);

replicator.updateRates();

ReplicatorStats status = replicator.getStats();
assertEquals(status.getReplicationBacklog(), 0);
}

@Test(timeOut = 30000)
public void testResetCursorNotFail() throws Exception {

Expand Down

0 comments on commit 5abadbe

Please sign in to comment.