Skip to content

Commit

Permalink
[improve][broker] Add the MessageExpirer interface to make code clear
Browse files Browse the repository at this point in the history
### Motivation

When I reviewed apache#20597, the
unrelated changes in `PersistentTopicsBase` are hard to read. The logic
could be simplified to:

```java
PersistentSubscription sub = null;
PersistentReplicator repl = null;
if (metSomeCondition()) {
    repl = /* ... */;
    if (repl == null) {
        /* ... */
        return;
    }
} else {
    sub = /* ... */;
    if (repl == null) {
        /* ... */
        return;
    }
}
final PersistentSubscription finalSub = sub;
final PersistentReplicator finalRepl = repl;
future.thenAccept(__ -> {
    if (metSomeCondition()) {
        repl.expireMessages(/* ... */);
    } else {
        sub.expireMessages(/* ... */);
    }
});
```

The code above is such a mess. It adds two final variables because the
lambda can only capture final variables. The `metSomeCondition` check is
performed unnecessarily twice. The original code is more hard to read
because the logic in `/* ... */` takes a few lines so that the two calls
of `metSomeCondition()` are not near.

From the code search I see all these classes implement two
`expireMessages` methods that accept an integer or a position.

- PersistentMessageExpiryMonitor
- PersistentSubscription
- PersistentReplicator
- NonPersistentSubscription

The code can be simplified to introduce a new interface.

### Modifications

Introduce a `MessageExpirer` interface and change the class hierarchy to:

```
// [I] is interface, [C] is class
[I] MessageExpirer
  [I] Subscription
    [C] PersistentSubscription
    [C] NonPersistentSubscription
  [C] PersistentReplicator
  [C] PersistentMessageExpiryMonitor
```

The method invocation can be simplified much as shown in this patch.

P.S. Inserting such an interface in the type hierarchy does not even
break the ABI compatibility, see https://docs.oracle.com/javase/specs/jls/se7/html/jls-13.html
  • Loading branch information
BewareMyPower committed Jul 13, 2023
1 parent 7233f0e commit 6983167
Show file tree
Hide file tree
Showing 5 changed files with 60 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
import org.apache.pulsar.broker.service.BrokerServiceException.AlreadyRunningException;
import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionBusyException;
import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionInvalidCursorPosition;
import org.apache.pulsar.broker.service.MessageExpirer;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentReplicator;
Expand Down Expand Up @@ -3955,28 +3956,20 @@ private CompletableFuture<Void> internalExpireMessagesByTimestampForSinglePartit
}
PersistentTopic topic = (PersistentTopic) t;

boolean issued;
final MessageExpirer messageExpirer;
if (subName.startsWith(topic.getReplicatorPrefix())) {
String remoteCluster = PersistentReplicator.getRemoteCluster(subName);
PersistentReplicator repl = (PersistentReplicator) topic
.getPersistentReplicator(remoteCluster);
if (repl == null) {
resultFuture.completeExceptionally(
new RestException(Status.NOT_FOUND, "Replicator not found"));
return;
}
issued = repl.expireMessages(expireTimeInSeconds);
messageExpirer = (PersistentReplicator) topic.getPersistentReplicator(remoteCluster);
} else {
PersistentSubscription sub = topic.getSubscription(subName);
if (sub == null) {
resultFuture.completeExceptionally(
new RestException(Status.NOT_FOUND,
getSubNotFoundErrorMessage(topicName.toString(), subName)));
return;
}
issued = sub.expireMessages(expireTimeInSeconds);
messageExpirer = topic.getSubscription(subName);
}
if (messageExpirer == null) {
final String message = subName.startsWith(topic.getReplicatorPrefix())
? "Replicator not found" : getSubNotFoundErrorMessage(topicName.toString(), subName);
resultFuture.completeExceptionally(new RestException(Status.NOT_FOUND, message));
return;
}
if (issued) {
if (messageExpirer.expireMessages(expireTimeInSeconds)) {
log.info("[{}] Message expire started up to {} on {} {}", clientAppId(),
expireTimeInSeconds, topicName, subName);
resultFuture.complete(null);
Expand Down Expand Up @@ -4066,44 +4059,27 @@ private CompletableFuture<Void> internalExpireMessagesNonPartitionedTopicByPosit
return;
}
try {
PersistentSubscription sub = null;
PersistentReplicator repl = null;

final MessageExpirer messageExpirer;
if (subName.startsWith(topic.getReplicatorPrefix())) {
String remoteCluster = PersistentReplicator.getRemoteCluster(subName);
repl = (PersistentReplicator)
topic.getPersistentReplicator(remoteCluster);
if (repl == null) {
asyncResponse.resume(new RestException(Status.NOT_FOUND,
"Replicator not found"));
return;
}
messageExpirer = (PersistentReplicator) topic.getPersistentReplicator(remoteCluster);
} else {
sub = topic.getSubscription(subName);
if (sub == null) {
asyncResponse.resume(new RestException(Status.NOT_FOUND,
getSubNotFoundErrorMessage(topicName.toString(), subName)));
return;
}
messageExpirer = topic.getSubscription(subName);
}
if (messageExpirer == null) {
final String message = (subName.startsWith(topic.getReplicatorPrefix()))
? "Replicator not found" : getSubNotFoundErrorMessage(topicName.toString(), subName);
asyncResponse.resume(new RestException(Status.NOT_FOUND, message));
return;
}

CompletableFuture<Integer> batchSizeFuture = new CompletableFuture<>();
getEntryBatchSize(batchSizeFuture, topic, messageId, batchIndex);

PersistentReplicator finalRepl = repl;
PersistentSubscription finalSub = sub;

batchSizeFuture.thenAccept(bi -> {
PositionImpl position = calculatePositionAckSet(isExcluded, bi, batchIndex, messageId);
boolean issued;
try {
if (subName.startsWith(topic.getReplicatorPrefix())) {
issued = finalRepl.expireMessages(position);
} else {
issued = finalSub.expireMessages(position);
}

if (issued) {
if (messageExpirer.expireMessages(position)) {
log.info("[{}] Message expire started up to {} on {} {}", clientAppId(), position,
topicName, subName);
} else {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.service;

import org.apache.bookkeeper.mledger.Position;
import org.apache.pulsar.common.classification.InterfaceStability;

@InterfaceStability.Evolving
public interface MessageExpirer {

boolean expireMessages(Position position);

boolean expireMessages(int messageTTLInSeconds);
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
import org.apache.pulsar.common.api.proto.ReplicatedSubscriptionsSnapshot;

public interface Subscription {
public interface Subscription extends MessageExpirer {

BrokerInterceptor interceptor();

Expand Down Expand Up @@ -84,10 +84,6 @@ default long getNumberOfEntriesDelayed() {

CompletableFuture<Entry> peekNthMessage(int messagePosition);

boolean expireMessages(int messageTTLInSeconds);

boolean expireMessages(Position position);

void redeliverUnacknowledgedMessages(Consumer consumer, long consumerEpoch);

void redeliverUnacknowledgedMessages(Consumer consumer, List<PositionImpl> positions);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,16 @@
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.service.MessageExpirer;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.stats.Rate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
*/
public class PersistentMessageExpiryMonitor implements FindEntryCallback {
public class PersistentMessageExpiryMonitor implements FindEntryCallback, MessageExpirer {
private final ManagedCursor cursor;
private final String subName;
private final PersistentTopic topic;
Expand Down Expand Up @@ -73,6 +73,7 @@ public PersistentMessageExpiryMonitor(PersistentTopic topic, String subscription
&& this.cursor.getManagedLedger().getConfig().isAutoSkipNonRecoverableData();
}

@Override
public boolean expireMessages(int messageTTLInSeconds) {
if (expirationCheckInProgressUpdater.compareAndSet(this, FALSE, TRUE)) {
log.info("[{}][{}] Starting message expiry check, ttl= {} seconds", topicName, subName,
Expand All @@ -99,6 +100,7 @@ public boolean expireMessages(int messageTTLInSeconds) {
}
}

@Override
public boolean expireMessages(Position messagePosition) {
// If it's beyond last position of this topic, do nothing.
PositionImpl topicLastPosition = (PositionImpl) this.topic.getLastPosition();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.pulsar.broker.service.AbstractReplicator;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.BrokerServiceException.TopicBusyException;
import org.apache.pulsar.broker.service.MessageExpirer;
import org.apache.pulsar.broker.service.Replicator;
import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter.Type;
import org.apache.pulsar.client.api.MessageId;
Expand All @@ -66,7 +67,7 @@
import org.slf4j.LoggerFactory;

public abstract class PersistentReplicator extends AbstractReplicator
implements Replicator, ReadEntriesCallback, DeleteCallback {
implements Replicator, ReadEntriesCallback, DeleteCallback, MessageExpirer {

protected final PersistentTopic topic;
protected final ManagedCursor cursor;
Expand Down Expand Up @@ -600,6 +601,7 @@ private long getReplicationDelayInSeconds() {
return 0L;
}

@Override
public boolean expireMessages(int messageTTLInSeconds) {
if ((cursor.getNumberOfEntriesInBacklog(false) == 0)
|| (cursor.getNumberOfEntriesInBacklog(false) < MINIMUM_BACKLOG_FOR_EXPIRY_CHECK
Expand All @@ -611,6 +613,7 @@ public boolean expireMessages(int messageTTLInSeconds) {
return expiryMonitor.expireMessages(messageTTLInSeconds);
}

@Override
public boolean expireMessages(Position position) {
return expiryMonitor.expireMessages(position);
}
Expand Down

0 comments on commit 6983167

Please sign in to comment.