diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index a57722be4e1ce..682eb7b12e97e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -80,6 +80,7 @@ import org.apache.pulsar.broker.service.BrokerServiceException.AlreadyRunningException; import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionBusyException; import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionInvalidCursorPosition; +import org.apache.pulsar.broker.service.MessageExpirer; import org.apache.pulsar.broker.service.Subscription; import org.apache.pulsar.broker.service.Topic; import org.apache.pulsar.broker.service.persistent.PersistentReplicator; @@ -3955,28 +3956,20 @@ private CompletableFuture internalExpireMessagesByTimestampForSinglePartit } PersistentTopic topic = (PersistentTopic) t; - boolean issued; + final MessageExpirer messageExpirer; if (subName.startsWith(topic.getReplicatorPrefix())) { String remoteCluster = PersistentReplicator.getRemoteCluster(subName); - PersistentReplicator repl = (PersistentReplicator) topic - .getPersistentReplicator(remoteCluster); - if (repl == null) { - resultFuture.completeExceptionally( - new RestException(Status.NOT_FOUND, "Replicator not found")); - return; - } - issued = repl.expireMessages(expireTimeInSeconds); + messageExpirer = (PersistentReplicator) topic.getPersistentReplicator(remoteCluster); } else { - PersistentSubscription sub = topic.getSubscription(subName); - if (sub == null) { - resultFuture.completeExceptionally( - new RestException(Status.NOT_FOUND, - getSubNotFoundErrorMessage(topicName.toString(), subName))); - return; - } - issued = sub.expireMessages(expireTimeInSeconds); + messageExpirer = topic.getSubscription(subName); + } + if (messageExpirer == null) { + final String message = subName.startsWith(topic.getReplicatorPrefix()) + ? "Replicator not found" : getSubNotFoundErrorMessage(topicName.toString(), subName); + resultFuture.completeExceptionally(new RestException(Status.NOT_FOUND, message)); + return; } - if (issued) { + if (messageExpirer.expireMessages(expireTimeInSeconds)) { log.info("[{}] Message expire started up to {} on {} {}", clientAppId(), expireTimeInSeconds, topicName, subName); resultFuture.complete(null); @@ -4066,44 +4059,27 @@ private CompletableFuture internalExpireMessagesNonPartitionedTopicByPosit return; } try { - PersistentSubscription sub = null; - PersistentReplicator repl = null; - + final MessageExpirer messageExpirer; if (subName.startsWith(topic.getReplicatorPrefix())) { String remoteCluster = PersistentReplicator.getRemoteCluster(subName); - repl = (PersistentReplicator) - topic.getPersistentReplicator(remoteCluster); - if (repl == null) { - asyncResponse.resume(new RestException(Status.NOT_FOUND, - "Replicator not found")); - return; - } + messageExpirer = (PersistentReplicator) topic.getPersistentReplicator(remoteCluster); } else { - sub = topic.getSubscription(subName); - if (sub == null) { - asyncResponse.resume(new RestException(Status.NOT_FOUND, - getSubNotFoundErrorMessage(topicName.toString(), subName))); - return; - } + messageExpirer = topic.getSubscription(subName); + } + if (messageExpirer == null) { + final String message = (subName.startsWith(topic.getReplicatorPrefix())) + ? "Replicator not found" : getSubNotFoundErrorMessage(topicName.toString(), subName); + asyncResponse.resume(new RestException(Status.NOT_FOUND, message)); + return; } CompletableFuture batchSizeFuture = new CompletableFuture<>(); getEntryBatchSize(batchSizeFuture, topic, messageId, batchIndex); - PersistentReplicator finalRepl = repl; - PersistentSubscription finalSub = sub; - batchSizeFuture.thenAccept(bi -> { PositionImpl position = calculatePositionAckSet(isExcluded, bi, batchIndex, messageId); - boolean issued; try { - if (subName.startsWith(topic.getReplicatorPrefix())) { - issued = finalRepl.expireMessages(position); - } else { - issued = finalSub.expireMessages(position); - } - - if (issued) { + if (messageExpirer.expireMessages(position)) { log.info("[{}] Message expire started up to {} on {} {}", clientAppId(), position, topicName, subName); } else { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/MessageExpirer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/MessageExpirer.java new file mode 100644 index 0000000000000..7cb1d2a904aa4 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/MessageExpirer.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service; + +import org.apache.bookkeeper.mledger.Position; +import org.apache.pulsar.common.classification.InterfaceStability; + +@InterfaceStability.Evolving +public interface MessageExpirer { + + boolean expireMessages(Position position); + + boolean expireMessages(int messageTTLInSeconds); +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java index 1f418a6e9f85f..8ac855c1e7c37 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Subscription.java @@ -30,7 +30,7 @@ import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType; import org.apache.pulsar.common.api.proto.ReplicatedSubscriptionsSnapshot; -public interface Subscription { +public interface Subscription extends MessageExpirer { BrokerInterceptor interceptor(); @@ -84,10 +84,6 @@ default long getNumberOfEntriesDelayed() { CompletableFuture peekNthMessage(int messagePosition); - boolean expireMessages(int messageTTLInSeconds); - - boolean expireMessages(Position position); - void redeliverUnacknowledgedMessages(Consumer consumer, long consumerEpoch); void redeliverUnacknowledgedMessages(Consumer consumer, List positions); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java index 3acd683f9f42b..020dc5323e55b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java @@ -32,16 +32,16 @@ import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; import org.apache.bookkeeper.mledger.impl.PositionImpl; +import org.apache.pulsar.broker.service.MessageExpirer; import org.apache.pulsar.client.impl.MessageImpl; import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType; import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.stats.Rate; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - /** */ -public class PersistentMessageExpiryMonitor implements FindEntryCallback { +public class PersistentMessageExpiryMonitor implements FindEntryCallback, MessageExpirer { private final ManagedCursor cursor; private final String subName; private final PersistentTopic topic; @@ -73,6 +73,7 @@ public PersistentMessageExpiryMonitor(PersistentTopic topic, String subscription && this.cursor.getManagedLedger().getConfig().isAutoSkipNonRecoverableData(); } + @Override public boolean expireMessages(int messageTTLInSeconds) { if (expirationCheckInProgressUpdater.compareAndSet(this, FALSE, TRUE)) { log.info("[{}][{}] Starting message expiry check, ttl= {} seconds", topicName, subName, @@ -99,6 +100,7 @@ public boolean expireMessages(int messageTTLInSeconds) { } } + @Override public boolean expireMessages(Position messagePosition) { // If it's beyond last position of this topic, do nothing. PositionImpl topicLastPosition = (PositionImpl) this.topic.getLastPosition(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java index 92f4b3eaf9b2e..0d387da1dd912 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java @@ -47,6 +47,7 @@ import org.apache.pulsar.broker.service.AbstractReplicator; import org.apache.pulsar.broker.service.BrokerService; import org.apache.pulsar.broker.service.BrokerServiceException.TopicBusyException; +import org.apache.pulsar.broker.service.MessageExpirer; import org.apache.pulsar.broker.service.Replicator; import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter.Type; import org.apache.pulsar.client.api.MessageId; @@ -66,7 +67,7 @@ import org.slf4j.LoggerFactory; public abstract class PersistentReplicator extends AbstractReplicator - implements Replicator, ReadEntriesCallback, DeleteCallback { + implements Replicator, ReadEntriesCallback, DeleteCallback, MessageExpirer { protected final PersistentTopic topic; protected final ManagedCursor cursor; @@ -600,6 +601,7 @@ private long getReplicationDelayInSeconds() { return 0L; } + @Override public boolean expireMessages(int messageTTLInSeconds) { if ((cursor.getNumberOfEntriesInBacklog(false) == 0) || (cursor.getNumberOfEntriesInBacklog(false) < MINIMUM_BACKLOG_FOR_EXPIRY_CHECK @@ -611,6 +613,7 @@ public boolean expireMessages(int messageTTLInSeconds) { return expiryMonitor.expireMessages(messageTTLInSeconds); } + @Override public boolean expireMessages(Position position) { return expiryMonitor.expireMessages(position); }