From 643ffadb7a868164b62508acef5b3ebb01694965 Mon Sep 17 00:00:00 2001 From: Xiangying Meng <55571188+liangyepianzhou@users.noreply.github.com> Date: Wed, 12 Jul 2023 12:43:41 +0800 Subject: [PATCH] [fix][broker][branch-2.10] Fix NPE when reset Replicator's cursor by position. (#20597) (#20781) --- .../admin/impl/PersistentTopicsBase.java | 42 ++++++++++++------- .../PersistentMessageExpiryMonitor.java | 23 ++++++++-- .../persistent/PersistentReplicator.java | 2 +- .../persistent/PersistentSubscription.java | 2 +- .../service/PersistentMessageFinderTest.java | 20 ++++++--- .../pulsar/broker/service/ReplicatorTest.java | 30 +++++++++++++ 6 files changed, 94 insertions(+), 25 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 44a5a8903315a..da37d1b7e545c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -3794,31 +3794,43 @@ private CompletableFuture internalExpireMessagesNonPartitionedTopicByPosit return; } try { - PersistentSubscription sub = topic.getSubscription(subName); - if (sub == null) { - asyncResponse.resume(new RestException(Status.NOT_FOUND, - "Subscription not found")); - return; + PersistentSubscription sub = null; + PersistentReplicator repl = null; + + if (subName.startsWith(topic.getReplicatorPrefix())) { + String remoteCluster = PersistentReplicator.getRemoteCluster(subName); + repl = (PersistentReplicator) + topic.getPersistentReplicator(remoteCluster); + if (repl == null) { + asyncResponse.resume(new RestException(Status.NOT_FOUND, + "Replicator not found")); + return; + } + } else { + sub = topic.getSubscription(subName); + if (sub == null) { + asyncResponse.resume(new RestException(Status.NOT_FOUND, + getSubNotFoundErrorMessage(topicName.toString(), subName))); + return; + } } + CompletableFuture batchSizeFuture = new CompletableFuture<>(); getEntryBatchSize(batchSizeFuture, topic, messageId, batchIndex); + + PersistentReplicator finalRepl = repl; + PersistentSubscription finalSub = sub; + batchSizeFuture.thenAccept(bi -> { PositionImpl position = calculatePositionAckSet(isExcluded, bi, batchIndex, messageId); boolean issued; try { if (subName.startsWith(topic.getReplicatorPrefix())) { - String remoteCluster = PersistentReplicator.getRemoteCluster(subName); - PersistentReplicator repl = (PersistentReplicator) - topic.getPersistentReplicator(remoteCluster); - if (repl == null) { - asyncResponse.resume(new RestException(Status.NOT_FOUND, - "Replicator not found")); - return; - } - issued = repl.expireMessages(position); + issued = finalRepl.expireMessages(position); } else { - issued = sub.expireMessages(position); + issued = finalSub.expireMessages(position); } + if (issued) { log.info("[{}] Message expire started up to {} on {} {}", clientAppId(), position, topicName, subName); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java index 4335762a21b82..35c6b807589ef 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java @@ -43,6 +43,7 @@ public class PersistentMessageExpiryMonitor implements FindEntryCallback { private final ManagedCursor cursor; private final String subName; + private final PersistentTopic topic; private final String topicName; private final Rate msgExpired; private final LongAdder totalMsgExpired; @@ -57,9 +58,24 @@ public class PersistentMessageExpiryMonitor implements FindEntryCallback { expirationCheckInProgressUpdater = AtomicIntegerFieldUpdater .newUpdater(PersistentMessageExpiryMonitor.class, "expirationCheckInProgress"); + public PersistentMessageExpiryMonitor(PersistentTopic topic, String subscriptionName, ManagedCursor cursor, + PersistentSubscription subscription) { + this.topic = topic; + this.topicName = topic.getName(); + this.cursor = cursor; + this.subName = subscriptionName; + this.subscription = subscription; + this.msgExpired = new Rate(); + this.totalMsgExpired = new LongAdder(); + // check to avoid test failures + this.autoSkipNonRecoverableData = this.cursor.getManagedLedger() != null + && this.cursor.getManagedLedger().getConfig().isAutoSkipNonRecoverableData(); + } + public PersistentMessageExpiryMonitor(String topicName, String subscriptionName, ManagedCursor cursor, PersistentSubscription subscription) { this.topicName = topicName; + this.topic = subscription.topic; this.cursor = cursor; this.subName = subscriptionName; this.subscription = subscription; @@ -98,11 +114,12 @@ public boolean expireMessages(int messageTTLInSeconds) { public boolean expireMessages(Position messagePosition) { // If it's beyond last position of this topic, do nothing. - if (((PositionImpl) subscription.getTopic().getLastPosition()).compareTo((PositionImpl) messagePosition) < 0) { + PositionImpl topicLastPosition = (PositionImpl) this.topic.getLastPosition(); + if (topicLastPosition.compareTo((PositionImpl) messagePosition) < 0) { if (log.isDebugEnabled()) { log.debug("[{}][{}] Ignore expire-message scheduled task, given position {} is beyond " - + "current topic's last position {}", topicName, subName, messagePosition, - subscription.getTopic().getLastPosition()); + + "current topic's last position {}", topicName, subName, messagePosition, + topicLastPosition); } return false; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java index 8254e512afc0a..adfae64489d83 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java @@ -114,7 +114,7 @@ public PersistentReplicator(PersistentTopic topic, ManagedCursor cursor, String replicationClient); this.topic = topic; this.cursor = cursor; - this.expiryMonitor = new PersistentMessageExpiryMonitor(topicName, + this.expiryMonitor = new PersistentMessageExpiryMonitor((PersistentTopic) localTopic, Codec.decode(cursor.getName()), cursor, null); HAVE_PENDING_READ_UPDATER.set(this, FALSE); PENDING_MESSAGES_UPDATER.set(this, 0); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java index 9654915685bb3..a14a5108dfff4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -140,7 +140,7 @@ public PersistentSubscription(PersistentTopic topic, String subscriptionName, Ma this.topicName = topic.getName(); this.subName = subscriptionName; this.fullName = MoreObjects.toStringHelper(this).add("topic", topicName).add("name", subName).toString(); - this.expiryMonitor = new PersistentMessageExpiryMonitor(topicName, subscriptionName, cursor, this); + this.expiryMonitor = new PersistentMessageExpiryMonitor(topic, subscriptionName, cursor, this); this.setReplicated(replicated); this.subscriptionProperties = MapUtils.isEmpty(subscriptionProperties) ? Collections.emptyMap() : Collections.unmodifiableMap(subscriptionProperties); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java index 05c07d8f44ed4..5287d842727bd 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java @@ -62,6 +62,7 @@ import org.apache.pulsar.broker.service.persistent.PersistentMessageExpiryMonitor; import org.apache.pulsar.broker.service.persistent.PersistentMessageFinder; import org.apache.pulsar.broker.service.persistent.PersistentSubscription; +import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.client.impl.ResetCursorData; import org.apache.pulsar.common.allocator.PulsarByteBufAllocator; import org.apache.pulsar.common.api.proto.BrokerEntryMetadata; @@ -230,7 +231,11 @@ public void findEntryFailed(ManagedLedgerException exception, Optional }); assertTrue(ex.get()); - PersistentMessageExpiryMonitor monitor = new PersistentMessageExpiryMonitor("topicname", c1.getName(), c1, null); + PersistentTopic mock = mock(PersistentTopic.class); + when(mock.getName()).thenReturn("topicname"); + when(mock.getLastPosition()).thenReturn(PositionImpl.EARLIEST); + + PersistentMessageExpiryMonitor monitor = new PersistentMessageExpiryMonitor(mock, c1.getName(), c1, null); monitor.findEntryFailed(new ManagedLedgerException.ConcurrentFindCursorPositionException("failed"), Optional.empty(), null); Field field = monitor.getClass().getDeclaredField("expirationCheckInProgress"); @@ -407,7 +412,11 @@ void testMessageExpiryWithTimestampNonRecoverableException() throws Exception { bkc.deleteLedger(ledgers.get(1).getLedgerId()); bkc.deleteLedger(ledgers.get(2).getLedgerId()); - PersistentMessageExpiryMonitor monitor = new PersistentMessageExpiryMonitor("topicname", c1.getName(), c1, null); + PersistentTopic mock = mock(PersistentTopic.class); + when(mock.getName()).thenReturn("topicname"); + when(mock.getLastPosition()).thenReturn(PositionImpl.EARLIEST); + + PersistentMessageExpiryMonitor monitor = new PersistentMessageExpiryMonitor(mock, c1.getName(), c1, null); Position previousMarkDelete = null; for (int i = 0; i < totalEntries; i++) { monitor.expireMessages(1); @@ -444,15 +453,16 @@ void testMessageExpiryWithPosition() throws Exception { ManagedCursorImpl cursor = (ManagedCursorImpl) ledger.openCursor(ledgerAndCursorName); PersistentSubscription subscription = mock(PersistentSubscription.class); - Topic topic = mock(Topic.class); + PersistentTopic topic = mock(PersistentTopic.class); when(subscription.getTopic()).thenReturn(topic); + when(topic.getName()).thenReturn("topicname"); for (int i = 0; i < totalEntries; i++) { positions.add(ledger.addEntry(createMessageWrittenToLedger("msg" + i))); } when(topic.getLastPosition()).thenReturn(positions.get(positions.size() - 1)); - PersistentMessageExpiryMonitor monitor = spy(new PersistentMessageExpiryMonitor("topicname", + PersistentMessageExpiryMonitor monitor = spy(new PersistentMessageExpiryMonitor(topic, cursor.getName(), cursor, subscription)); assertEquals(cursor.getMarkDeletedPosition(), PositionImpl.get(positions.get(0).getLedgerId(), -1)); boolean issued; @@ -491,7 +501,7 @@ void testMessageExpiryWithPosition() throws Exception { clearInvocations(monitor); ManagedCursorImpl mockCursor = mock(ManagedCursorImpl.class); - PersistentMessageExpiryMonitor mockMonitor = spy(new PersistentMessageExpiryMonitor("topicname", + PersistentMessageExpiryMonitor mockMonitor = spy(new PersistentMessageExpiryMonitor(topic, cursor.getName(), mockCursor, subscription)); // Not calling findEntryComplete to clear expirationCheckInProgress condition, so following call to // expire message shouldn't issue. diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java index 158d223336b0d..85c8eca8fa054 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java @@ -726,6 +726,36 @@ public void testReplicatorClearBacklog() throws Exception { assertEquals(status.getReplicationBacklog(), 0); } + + @Test(timeOut = 30000) + public void testResetReplicatorSubscriptionPosition() throws Exception { + final TopicName dest = TopicName + .get(BrokerTestUtil.newUniqueName("persistent://pulsar/ns/resetReplicatorSubscription")); + + @Cleanup + MessageProducer producer1 = new MessageProducer(url1, dest); + + // Produce from cluster1 and consume from the rest + for (int i = 0; i < 10; i++) { + producer1.produce(2); + } + + PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService().getTopicReference(dest.toString()).get(); + + PersistentReplicator replicator = (PersistentReplicator) spy( + topic.getReplicators().get(topic.getReplicators().keys().get(0))); + + MessageId id = topic.getLastMessageId().get(); + admin1.topics().expireMessages(dest.getPartitionedTopicName(), + replicator.getCursor().getName(), + id,false); + + replicator.updateRates(); + + ReplicatorStats status = replicator.getStats(); + assertEquals(status.getReplicationBacklog(), 0); + } + @Test(timeOut = 30000) public void testResetCursorNotFail() throws Exception {