diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 426c05fca3888..d8f8e7304d291 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -4114,31 +4114,43 @@ private CompletableFuture internalExpireMessagesNonPartitionedTopicByPosit return; } try { - PersistentSubscription sub = topic.getSubscription(subName); - if (sub == null) { - asyncResponse.resume(new RestException(Status.NOT_FOUND, - getSubNotFoundErrorMessage(topicName.toString(), subName))); - return; + PersistentSubscription sub = null; + PersistentReplicator repl = null; + + if (subName.startsWith(topic.getReplicatorPrefix())) { + String remoteCluster = PersistentReplicator.getRemoteCluster(subName); + repl = (PersistentReplicator) + topic.getPersistentReplicator(remoteCluster); + if (repl == null) { + asyncResponse.resume(new RestException(Status.NOT_FOUND, + "Replicator not found")); + return; + } + } else { + sub = topic.getSubscription(subName); + if (sub == null) { + asyncResponse.resume(new RestException(Status.NOT_FOUND, + getSubNotFoundErrorMessage(topicName.toString(), subName))); + return; + } } + CompletableFuture batchSizeFuture = new CompletableFuture<>(); getEntryBatchSize(batchSizeFuture, topic, messageId, batchIndex); + + PersistentReplicator finalRepl = repl; + PersistentSubscription finalSub = sub; + batchSizeFuture.thenAccept(bi -> { PositionImpl position = calculatePositionAckSet(isExcluded, bi, batchIndex, messageId); boolean issued; try { if (subName.startsWith(topic.getReplicatorPrefix())) { - String remoteCluster = PersistentReplicator.getRemoteCluster(subName); - PersistentReplicator repl = (PersistentReplicator) - topic.getPersistentReplicator(remoteCluster); - if (repl == null) { - asyncResponse.resume(new RestException(Status.NOT_FOUND, - "Replicator not found")); - return; - } - issued = repl.expireMessages(position); + issued = finalRepl.expireMessages(position); } else { - issued = sub.expireMessages(position); + issued = finalSub.expireMessages(position); } + if (issued) { log.info("[{}] Message expire started up to {} on {} {}", clientAppId(), position, topicName, subName); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java index bddcb1b334df1..3acd683f9f42b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java @@ -22,6 +22,7 @@ import java.util.Optional; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.LongAdder; +import javax.annotation.Nullable; import org.apache.bookkeeper.mledger.AsyncCallbacks.FindEntryCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.MarkDeleteCallback; import org.apache.bookkeeper.mledger.ManagedCursor; @@ -43,6 +44,7 @@ public class PersistentMessageExpiryMonitor implements FindEntryCallback { private final ManagedCursor cursor; private final String subName; + private final PersistentTopic topic; private final String topicName; private final Rate msgExpired; private final LongAdder totalMsgExpired; @@ -57,9 +59,10 @@ public class PersistentMessageExpiryMonitor implements FindEntryCallback { expirationCheckInProgressUpdater = AtomicIntegerFieldUpdater .newUpdater(PersistentMessageExpiryMonitor.class, "expirationCheckInProgress"); - public PersistentMessageExpiryMonitor(String topicName, String subscriptionName, ManagedCursor cursor, - PersistentSubscription subscription) { - this.topicName = topicName; + public PersistentMessageExpiryMonitor(PersistentTopic topic, String subscriptionName, ManagedCursor cursor, + @Nullable PersistentSubscription subscription) { + this.topic = topic; + this.topicName = topic.getName(); this.cursor = cursor; this.subName = subscriptionName; this.subscription = subscription; @@ -98,11 +101,12 @@ public boolean expireMessages(int messageTTLInSeconds) { public boolean expireMessages(Position messagePosition) { // If it's beyond last position of this topic, do nothing. - if (((PositionImpl) subscription.getTopic().getLastPosition()).compareTo((PositionImpl) messagePosition) < 0) { + PositionImpl topicLastPosition = (PositionImpl) this.topic.getLastPosition(); + if (topicLastPosition.compareTo((PositionImpl) messagePosition) < 0) { if (log.isDebugEnabled()) { log.debug("[{}][{}] Ignore expire-message scheduled task, given position {} is beyond " - + "current topic's last position {}", topicName, subName, messagePosition, - subscription.getTopic().getLastPosition()); + + "current topic's last position {}", topicName, subName, messagePosition, + topicLastPosition); } return false; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java index ccf70eecec35c..d468cb0643f72 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java @@ -116,7 +116,7 @@ public PersistentReplicator(String localCluster, PersistentTopic localTopic, Man brokerService, replicationClient); this.topic = localTopic; this.cursor = cursor; - this.expiryMonitor = new PersistentMessageExpiryMonitor(localTopicName, + this.expiryMonitor = new PersistentMessageExpiryMonitor(localTopic, Codec.decode(cursor.getName()), cursor, null); HAVE_PENDING_READ_UPDATER.set(this, FALSE); PENDING_MESSAGES_UPDATER.set(this, 0); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java index dc666f3a18e48..09a9961806211 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -149,7 +149,7 @@ public PersistentSubscription(PersistentTopic topic, String subscriptionName, Ma this.topicName = topic.getName(); this.subName = subscriptionName; this.fullName = MoreObjects.toStringHelper(this).add("topic", topicName).add("name", subName).toString(); - this.expiryMonitor = new PersistentMessageExpiryMonitor(topicName, subscriptionName, cursor, this); + this.expiryMonitor = new PersistentMessageExpiryMonitor(topic, subscriptionName, cursor, this); this.setReplicated(replicated); this.subscriptionProperties = MapUtils.isEmpty(subscriptionProperties) ? Collections.emptyMap() : Collections.unmodifiableMap(subscriptionProperties); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java index e77fd07c6ef8b..f0e2e6eafcdfb 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java @@ -62,6 +62,7 @@ import org.apache.pulsar.broker.service.persistent.PersistentMessageExpiryMonitor; import org.apache.pulsar.broker.service.persistent.PersistentMessageFinder; import org.apache.pulsar.broker.service.persistent.PersistentSubscription; +import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.client.impl.ResetCursorData; import org.apache.pulsar.common.allocator.PulsarByteBufAllocator; import org.apache.pulsar.common.api.proto.BrokerEntryMetadata; @@ -230,7 +231,11 @@ public void findEntryFailed(ManagedLedgerException exception, Optional }); assertTrue(ex.get()); - PersistentMessageExpiryMonitor monitor = new PersistentMessageExpiryMonitor("topicname", c1.getName(), c1, null); + PersistentTopic mock = mock(PersistentTopic.class); + when(mock.getName()).thenReturn("topicname"); + when(mock.getLastPosition()).thenReturn(PositionImpl.EARLIEST); + + PersistentMessageExpiryMonitor monitor = new PersistentMessageExpiryMonitor(mock, c1.getName(), c1, null); monitor.findEntryFailed(new ManagedLedgerException.ConcurrentFindCursorPositionException("failed"), Optional.empty(), null); Field field = monitor.getClass().getDeclaredField("expirationCheckInProgress"); @@ -407,7 +412,11 @@ void testMessageExpiryWithTimestampNonRecoverableException() throws Exception { bkc.deleteLedger(ledgers.get(1).getLedgerId()); bkc.deleteLedger(ledgers.get(2).getLedgerId()); - PersistentMessageExpiryMonitor monitor = new PersistentMessageExpiryMonitor("topicname", c1.getName(), c1, null); + PersistentTopic mock = mock(PersistentTopic.class); + when(mock.getName()).thenReturn("topicname"); + when(mock.getLastPosition()).thenReturn(PositionImpl.EARLIEST); + + PersistentMessageExpiryMonitor monitor = new PersistentMessageExpiryMonitor(mock, c1.getName(), c1, null); Position previousMarkDelete = null; for (int i = 0; i < totalEntries; i++) { monitor.expireMessages(1); @@ -444,15 +453,16 @@ void testMessageExpiryWithPosition() throws Exception { ManagedCursorImpl cursor = (ManagedCursorImpl) ledger.openCursor(ledgerAndCursorName); PersistentSubscription subscription = mock(PersistentSubscription.class); - Topic topic = mock(Topic.class); + PersistentTopic topic = mock(PersistentTopic.class); when(subscription.getTopic()).thenReturn(topic); + when(topic.getName()).thenReturn("topicname"); for (int i = 0; i < totalEntries; i++) { positions.add(ledger.addEntry(createMessageWrittenToLedger("msg" + i))); } when(topic.getLastPosition()).thenReturn(positions.get(positions.size() - 1)); - PersistentMessageExpiryMonitor monitor = spy(new PersistentMessageExpiryMonitor("topicname", + PersistentMessageExpiryMonitor monitor = spy(new PersistentMessageExpiryMonitor(topic, cursor.getName(), cursor, subscription)); assertEquals(cursor.getMarkDeletedPosition(), PositionImpl.get(positions.get(0).getLedgerId(), -1)); boolean issued; @@ -491,7 +501,7 @@ void testMessageExpiryWithPosition() throws Exception { clearInvocations(monitor); ManagedCursorImpl mockCursor = mock(ManagedCursorImpl.class); - PersistentMessageExpiryMonitor mockMonitor = spy(new PersistentMessageExpiryMonitor("topicname", + PersistentMessageExpiryMonitor mockMonitor = spy(new PersistentMessageExpiryMonitor(topic, cursor.getName(), mockCursor, subscription)); // Not calling findEntryComplete to clear expirationCheckInProgress condition, so following call to // expire message shouldn't issue. diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java index 176eab0e94b3d..f710c8541d1b5 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java @@ -728,6 +728,36 @@ public void testReplicatorClearBacklog() throws Exception { assertEquals(status.getReplicationBacklog(), 0); } + + @Test(timeOut = 30000) + public void testResetReplicatorSubscriptionPosition() throws Exception { + final TopicName dest = TopicName + .get(BrokerTestUtil.newUniqueName("persistent://pulsar/ns/resetReplicatorSubscription")); + + @Cleanup + MessageProducer producer1 = new MessageProducer(url1, dest); + + // Produce from cluster1 and consume from the rest + for (int i = 0; i < 10; i++) { + producer1.produce(2); + } + + PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService().getTopicReference(dest.toString()).get(); + + PersistentReplicator replicator = (PersistentReplicator) spy( + topic.getReplicators().get(topic.getReplicators().keys().get(0))); + + MessageId id = topic.getLastMessageId().get(); + admin1.topics().expireMessages(dest.getPartitionedTopicName(), + replicator.getCursor().getName(), + id,false); + + replicator.updateRates(); + + ReplicatorStats status = replicator.getStats(); + assertEquals(status.getReplicationBacklog(), 0); + } + @Test(timeOut = 30000) public void testResetCursorNotFail() throws Exception {