diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 9a2dd5c7c54fd..8942ec7521536 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -4107,27 +4107,31 @@ private CompletableFuture internalExpireMessagesNonPartitionedTopicByPosit return; } try { - final MessageExpirer messageExpirer; - if (subName.startsWith(topic.getReplicatorPrefix())) { - String remoteCluster = PersistentReplicator.getRemoteCluster(subName); - messageExpirer = (PersistentReplicator) topic.getPersistentReplicator(remoteCluster); - } else { - messageExpirer = topic.getSubscription(subName); - } - if (messageExpirer == null) { - final String message = (subName.startsWith(topic.getReplicatorPrefix())) - ? "Replicator not found" : getSubNotFoundErrorMessage(topicName.toString(), subName); - asyncResponse.resume(new RestException(Status.NOT_FOUND, message)); + PersistentSubscription sub = topic.getSubscription(subName); + if (sub == null) { + asyncResponse.resume(new RestException(Status.NOT_FOUND, + getSubNotFoundErrorMessage(topicName.toString(), subName))); return; } - CompletableFuture batchSizeFuture = new CompletableFuture<>(); getEntryBatchSize(batchSizeFuture, topic, messageId, batchIndex); - batchSizeFuture.thenAccept(bi -> { PositionImpl position = calculatePositionAckSet(isExcluded, bi, batchIndex, messageId); try { - if (messageExpirer.expireMessages(position)) { + if (subName.startsWith(topic.getReplicatorPrefix())) { + String remoteCluster = PersistentReplicator.getRemoteCluster(subName); + PersistentReplicator repl = (PersistentReplicator) + topic.getPersistentReplicator(remoteCluster); + if (repl == null) { + asyncResponse.resume(new RestException(Status.NOT_FOUND, + "Replicator not found")); + return; + } + issued = repl.expireMessages(position); + } else { + issued = sub.expireMessages(position); + } + if (issued) { log.info("[{}] Message expire started up to {} on {} {}", clientAppId(), position, topicName, subName); } else { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java index ac391c1050340..84133a8383938 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java @@ -24,7 +24,6 @@ import java.util.SortedMap; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.LongAdder; -import javax.annotation.Nullable; import org.apache.bookkeeper.mledger.AsyncCallbacks.FindEntryCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.MarkDeleteCallback; import org.apache.bookkeeper.mledger.ManagedCursor; @@ -48,7 +47,6 @@ public class PersistentMessageExpiryMonitor implements FindEntryCallback, MessageExpirer { private final ManagedCursor cursor; private final String subName; - private final PersistentTopic topic; private final String topicName; private final Rate msgExpired; private final LongAdder totalMsgExpired; @@ -62,10 +60,9 @@ public class PersistentMessageExpiryMonitor implements FindEntryCallback, Messag expirationCheckInProgressUpdater = AtomicIntegerFieldUpdater .newUpdater(PersistentMessageExpiryMonitor.class, "expirationCheckInProgress"); - public PersistentMessageExpiryMonitor(PersistentTopic topic, String subscriptionName, ManagedCursor cursor, - @Nullable PersistentSubscription subscription) { - this.topic = topic; - this.topicName = topic.getName(); + public PersistentMessageExpiryMonitor(String topicName, String subscriptionName, ManagedCursor cursor, + PersistentSubscription subscription) { + this.topicName = topicName; this.cursor = cursor; this.subName = subscriptionName; this.subscription = subscription; @@ -141,12 +138,11 @@ private void checkExpiryByLedgerClosureTime(ManagedCursor cursor, int messageTTL @Override public boolean expireMessages(Position messagePosition) { // If it's beyond last position of this topic, do nothing. - PositionImpl topicLastPosition = (PositionImpl) this.topic.getLastPosition(); - if (topicLastPosition.compareTo((PositionImpl) messagePosition) < 0) { + if (((PositionImpl) subscription.getTopic().getLastPosition()).compareTo((PositionImpl) messagePosition) < 0) { if (log.isDebugEnabled()) { log.debug("[{}][{}] Ignore expire-message scheduled task, given position {} is beyond " - + "current topic's last position {}", topicName, subName, messagePosition, - topicLastPosition); + + "current topic's last position {}", topicName, subName, messagePosition, + subscription.getTopic().getLastPosition()); } return false; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java index 0d387da1dd912..35e140ac3713f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java @@ -117,7 +117,7 @@ public PersistentReplicator(String localCluster, PersistentTopic localTopic, Man brokerService, replicationClient); this.topic = localTopic; this.cursor = cursor; - this.expiryMonitor = new PersistentMessageExpiryMonitor(localTopic, + this.expiryMonitor = new PersistentMessageExpiryMonitor(localTopicName, Codec.decode(cursor.getName()), cursor, null); HAVE_PENDING_READ_UPDATER.set(this, FALSE); PENDING_MESSAGES_UPDATER.set(this, 0); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java index f00c95f7e6816..e23142e40b792 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -151,7 +151,7 @@ public PersistentSubscription(PersistentTopic topic, String subscriptionName, Ma this.topicName = topic.getName(); this.subName = subscriptionName; this.fullName = MoreObjects.toStringHelper(this).add("topic", topicName).add("name", subName).toString(); - this.expiryMonitor = new PersistentMessageExpiryMonitor(topic, subscriptionName, cursor, this); + this.expiryMonitor = new PersistentMessageExpiryMonitor(topicName, subscriptionName, cursor, this); this.setReplicated(replicated); this.subscriptionProperties = MapUtils.isEmpty(subscriptionProperties) ? Collections.emptyMap() : Collections.unmodifiableMap(subscriptionProperties); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java index ac5ab94c213c5..0300a58552620 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java @@ -62,7 +62,6 @@ import org.apache.pulsar.broker.service.persistent.PersistentMessageExpiryMonitor; import org.apache.pulsar.broker.service.persistent.PersistentMessageFinder; import org.apache.pulsar.broker.service.persistent.PersistentSubscription; -import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.client.impl.ResetCursorData; import org.apache.pulsar.common.allocator.PulsarByteBufAllocator; import org.apache.pulsar.common.api.proto.BrokerEntryMetadata; @@ -234,11 +233,7 @@ public void findEntryFailed(ManagedLedgerException exception, Optional }); assertTrue(ex.get()); - PersistentTopic mock = mock(PersistentTopic.class); - when(mock.getName()).thenReturn("topicname"); - when(mock.getLastPosition()).thenReturn(PositionImpl.EARLIEST); - - PersistentMessageExpiryMonitor monitor = new PersistentMessageExpiryMonitor(mock, c1.getName(), c1, null); + PersistentMessageExpiryMonitor monitor = new PersistentMessageExpiryMonitor("topicname", c1.getName(), c1, null); monitor.findEntryFailed(new ManagedLedgerException.ConcurrentFindCursorPositionException("failed"), Optional.empty(), null); Field field = monitor.getClass().getDeclaredField("expirationCheckInProgress"); @@ -415,11 +410,7 @@ void testMessageExpiryWithTimestampNonRecoverableException() throws Exception { bkc.deleteLedger(ledgers.get(1).getLedgerId()); bkc.deleteLedger(ledgers.get(2).getLedgerId()); - PersistentTopic mock = mock(PersistentTopic.class); - when(mock.getName()).thenReturn("topicname"); - when(mock.getLastPosition()).thenReturn(PositionImpl.EARLIEST); - - PersistentMessageExpiryMonitor monitor = new PersistentMessageExpiryMonitor(mock, c1.getName(), c1, null); + PersistentMessageExpiryMonitor monitor = new PersistentMessageExpiryMonitor("topicname", c1.getName(), c1, null); Position previousMarkDelete = null; for (int i = 0; i < totalEntries; i++) { monitor.expireMessages(1); @@ -479,16 +470,15 @@ void testMessageExpiryWithPosition() throws Exception { ManagedCursorImpl cursor = (ManagedCursorImpl) ledger.openCursor(ledgerAndCursorName); PersistentSubscription subscription = mock(PersistentSubscription.class); - PersistentTopic topic = mock(PersistentTopic.class); + Topic topic = mock(Topic.class); when(subscription.getTopic()).thenReturn(topic); - when(topic.getName()).thenReturn("topicname"); for (int i = 0; i < totalEntries; i++) { positions.add(ledger.addEntry(createMessageWrittenToLedger("msg" + i))); } when(topic.getLastPosition()).thenReturn(positions.get(positions.size() - 1)); - PersistentMessageExpiryMonitor monitor = spy(new PersistentMessageExpiryMonitor(topic, + PersistentMessageExpiryMonitor monitor = spy(new PersistentMessageExpiryMonitor("topicname", cursor.getName(), cursor, subscription)); assertEquals(cursor.getMarkDeletedPosition(), PositionImpl.get(positions.get(0).getLedgerId(), -1)); boolean issued; @@ -527,7 +517,7 @@ void testMessageExpiryWithPosition() throws Exception { clearInvocations(monitor); ManagedCursorImpl mockCursor = mock(ManagedCursorImpl.class); - PersistentMessageExpiryMonitor mockMonitor = spy(new PersistentMessageExpiryMonitor(topic, + PersistentMessageExpiryMonitor mockMonitor = spy(new PersistentMessageExpiryMonitor("topicname", cursor.getName(), mockCursor, subscription)); // Not calling findEntryComplete to clear expirationCheckInProgress condition, so following call to // expire message shouldn't issue. diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java index 159d49ca2e7cc..3fa3a8dfc0f15 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java @@ -727,36 +727,6 @@ public void testReplicatorClearBacklog() throws Exception { assertEquals(status.getReplicationBacklog(), 0); } - - @Test(timeOut = 30000) - public void testResetReplicatorSubscriptionPosition() throws Exception { - final TopicName dest = TopicName - .get(BrokerTestUtil.newUniqueName("persistent://pulsar/ns/resetReplicatorSubscription")); - - @Cleanup - MessageProducer producer1 = new MessageProducer(url1, dest); - - // Produce from cluster1 and consume from the rest - for (int i = 0; i < 10; i++) { - producer1.produce(2); - } - - PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService().getTopicReference(dest.toString()).get(); - - PersistentReplicator replicator = (PersistentReplicator) spy( - topic.getReplicators().get(topic.getReplicators().keys().get(0))); - - MessageId id = topic.getLastMessageId().get(); - admin1.topics().expireMessages(dest.getPartitionedTopicName(), - replicator.getCursor().getName(), - id,false); - - replicator.updateRates(); - - ReplicatorStats status = replicator.getStats(); - assertEquals(status.getReplicationBacklog(), 0); - } - @Test(timeOut = 30000) public void testResetCursorNotFail() throws Exception {