Skip to content

Commit

Permalink
Revert "[fix][broker] Fix NPE when reset Replicator's cursor by posit…
Browse files Browse the repository at this point in the history
…ion. (apache#20597)"

This reverts commit 18f89b6.
  • Loading branch information
liangyepianzhou committed Jul 11, 2023
1 parent f8729c0 commit 209b222
Show file tree
Hide file tree
Showing 6 changed files with 28 additions and 84 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3794,43 +3794,31 @@ private CompletableFuture<Void> internalExpireMessagesNonPartitionedTopicByPosit
return;
}
try {
PersistentSubscription sub = null;
PersistentReplicator repl = null;

if (subName.startsWith(topic.getReplicatorPrefix())) {
String remoteCluster = PersistentReplicator.getRemoteCluster(subName);
repl = (PersistentReplicator)
topic.getPersistentReplicator(remoteCluster);
if (repl == null) {
asyncResponse.resume(new RestException(Status.NOT_FOUND,
"Replicator not found"));
return;
}
} else {
sub = topic.getSubscription(subName);
if (sub == null) {
asyncResponse.resume(new RestException(Status.NOT_FOUND,
getSubNotFoundErrorMessage(topicName.toString(), subName)));
return;
}
PersistentSubscription sub = topic.getSubscription(subName);
if (sub == null) {
asyncResponse.resume(new RestException(Status.NOT_FOUND,
"Subscription not found"));
return;
}

CompletableFuture<Integer> batchSizeFuture = new CompletableFuture<>();
getEntryBatchSize(batchSizeFuture, topic, messageId, batchIndex);

PersistentReplicator finalRepl = repl;
PersistentSubscription finalSub = sub;

batchSizeFuture.thenAccept(bi -> {
PositionImpl position = calculatePositionAckSet(isExcluded, bi, batchIndex, messageId);
boolean issued;
try {
if (subName.startsWith(topic.getReplicatorPrefix())) {
issued = finalRepl.expireMessages(position);
String remoteCluster = PersistentReplicator.getRemoteCluster(subName);
PersistentReplicator repl = (PersistentReplicator)
topic.getPersistentReplicator(remoteCluster);
if (repl == null) {
asyncResponse.resume(new RestException(Status.NOT_FOUND,
"Replicator not found"));
return;
}
issued = repl.expireMessages(position);
} else {
issued = finalSub.expireMessages(position);
issued = sub.expireMessages(position);
}

if (issued) {
log.info("[{}] Message expire started up to {} on {} {}", clientAppId(), position,
topicName, subName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import java.util.Optional;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.LongAdder;
import javax.annotation.Nullable;
import org.apache.bookkeeper.mledger.AsyncCallbacks.FindEntryCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.MarkDeleteCallback;
import org.apache.bookkeeper.mledger.ManagedCursor;
Expand All @@ -44,7 +43,6 @@
public class PersistentMessageExpiryMonitor implements FindEntryCallback {
private final ManagedCursor cursor;
private final String subName;
private final PersistentTopic topic;
private final String topicName;
private final Rate msgExpired;
private final LongAdder totalMsgExpired;
Expand All @@ -59,10 +57,9 @@ public class PersistentMessageExpiryMonitor implements FindEntryCallback {
expirationCheckInProgressUpdater = AtomicIntegerFieldUpdater
.newUpdater(PersistentMessageExpiryMonitor.class, "expirationCheckInProgress");

public PersistentMessageExpiryMonitor(PersistentTopic topic, String subscriptionName, ManagedCursor cursor,
@Nullable PersistentSubscription subscription) {
this.topic = topic;
this.topicName = topic.getName();
public PersistentMessageExpiryMonitor(String topicName, String subscriptionName, ManagedCursor cursor,
PersistentSubscription subscription) {
this.topicName = topicName;
this.cursor = cursor;
this.subName = subscriptionName;
this.subscription = subscription;
Expand Down Expand Up @@ -101,12 +98,11 @@ public boolean expireMessages(int messageTTLInSeconds) {

public boolean expireMessages(Position messagePosition) {
// If it's beyond last position of this topic, do nothing.
PositionImpl topicLastPosition = (PositionImpl) this.topic.getLastPosition();
if (topicLastPosition.compareTo((PositionImpl) messagePosition) < 0) {
if (((PositionImpl) subscription.getTopic().getLastPosition()).compareTo((PositionImpl) messagePosition) < 0) {
if (log.isDebugEnabled()) {
log.debug("[{}][{}] Ignore expire-message scheduled task, given position {} is beyond "
+ "current topic's last position {}", topicName, subName, messagePosition,
topicLastPosition);
+ "current topic's last position {}", topicName, subName, messagePosition,
subscription.getTopic().getLastPosition());
}
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ public PersistentReplicator(PersistentTopic topic, ManagedCursor cursor, String
replicationClient);
this.topic = topic;
this.cursor = cursor;
this.expiryMonitor = new PersistentMessageExpiryMonitor((PersistentTopic) localTopic,
this.expiryMonitor = new PersistentMessageExpiryMonitor(topicName,
Codec.decode(cursor.getName()), cursor, null);
HAVE_PENDING_READ_UPDATER.set(this, FALSE);
PENDING_MESSAGES_UPDATER.set(this, 0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ public PersistentSubscription(PersistentTopic topic, String subscriptionName, Ma
this.topicName = topic.getName();
this.subName = subscriptionName;
this.fullName = MoreObjects.toStringHelper(this).add("topic", topicName).add("name", subName).toString();
this.expiryMonitor = new PersistentMessageExpiryMonitor(topic, subscriptionName, cursor, this);
this.expiryMonitor = new PersistentMessageExpiryMonitor(topicName, subscriptionName, cursor, this);
this.setReplicated(replicated);
this.subscriptionProperties = MapUtils.isEmpty(subscriptionProperties)
? Collections.emptyMap() : Collections.unmodifiableMap(subscriptionProperties);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@
import org.apache.pulsar.broker.service.persistent.PersistentMessageExpiryMonitor;
import org.apache.pulsar.broker.service.persistent.PersistentMessageFinder;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.impl.ResetCursorData;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.api.proto.BrokerEntryMetadata;
Expand Down Expand Up @@ -231,11 +230,7 @@ public void findEntryFailed(ManagedLedgerException exception, Optional<Position>
});
assertTrue(ex.get());

PersistentTopic mock = mock(PersistentTopic.class);
when(mock.getName()).thenReturn("topicname");
when(mock.getLastPosition()).thenReturn(PositionImpl.EARLIEST);

PersistentMessageExpiryMonitor monitor = new PersistentMessageExpiryMonitor(mock, c1.getName(), c1, null);
PersistentMessageExpiryMonitor monitor = new PersistentMessageExpiryMonitor("topicname", c1.getName(), c1, null);
monitor.findEntryFailed(new ManagedLedgerException.ConcurrentFindCursorPositionException("failed"),
Optional.empty(), null);
Field field = monitor.getClass().getDeclaredField("expirationCheckInProgress");
Expand Down Expand Up @@ -412,11 +407,7 @@ void testMessageExpiryWithTimestampNonRecoverableException() throws Exception {
bkc.deleteLedger(ledgers.get(1).getLedgerId());
bkc.deleteLedger(ledgers.get(2).getLedgerId());

PersistentTopic mock = mock(PersistentTopic.class);
when(mock.getName()).thenReturn("topicname");
when(mock.getLastPosition()).thenReturn(PositionImpl.EARLIEST);

PersistentMessageExpiryMonitor monitor = new PersistentMessageExpiryMonitor(mock, c1.getName(), c1, null);
PersistentMessageExpiryMonitor monitor = new PersistentMessageExpiryMonitor("topicname", c1.getName(), c1, null);
Position previousMarkDelete = null;
for (int i = 0; i < totalEntries; i++) {
monitor.expireMessages(1);
Expand Down Expand Up @@ -453,16 +444,15 @@ void testMessageExpiryWithPosition() throws Exception {
ManagedCursorImpl cursor = (ManagedCursorImpl) ledger.openCursor(ledgerAndCursorName);

PersistentSubscription subscription = mock(PersistentSubscription.class);
PersistentTopic topic = mock(PersistentTopic.class);
Topic topic = mock(Topic.class);
when(subscription.getTopic()).thenReturn(topic);
when(topic.getName()).thenReturn("topicname");

for (int i = 0; i < totalEntries; i++) {
positions.add(ledger.addEntry(createMessageWrittenToLedger("msg" + i)));
}
when(topic.getLastPosition()).thenReturn(positions.get(positions.size() - 1));

PersistentMessageExpiryMonitor monitor = spy(new PersistentMessageExpiryMonitor(topic,
PersistentMessageExpiryMonitor monitor = spy(new PersistentMessageExpiryMonitor("topicname",
cursor.getName(), cursor, subscription));
assertEquals(cursor.getMarkDeletedPosition(), PositionImpl.get(positions.get(0).getLedgerId(), -1));
boolean issued;
Expand Down Expand Up @@ -501,7 +491,7 @@ void testMessageExpiryWithPosition() throws Exception {
clearInvocations(monitor);

ManagedCursorImpl mockCursor = mock(ManagedCursorImpl.class);
PersistentMessageExpiryMonitor mockMonitor = spy(new PersistentMessageExpiryMonitor(topic,
PersistentMessageExpiryMonitor mockMonitor = spy(new PersistentMessageExpiryMonitor("topicname",
cursor.getName(), mockCursor, subscription));
// Not calling findEntryComplete to clear expirationCheckInProgress condition, so following call to
// expire message shouldn't issue.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -726,36 +726,6 @@ public void testReplicatorClearBacklog() throws Exception {
assertEquals(status.getReplicationBacklog(), 0);
}


@Test(timeOut = 30000)
public void testResetReplicatorSubscriptionPosition() throws Exception {
final TopicName dest = TopicName
.get(BrokerTestUtil.newUniqueName("persistent://pulsar/ns/resetReplicatorSubscription"));

@Cleanup
MessageProducer producer1 = new MessageProducer(url1, dest);

// Produce from cluster1 and consume from the rest
for (int i = 0; i < 10; i++) {
producer1.produce(2);
}

PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService().getTopicReference(dest.toString()).get();

PersistentReplicator replicator = (PersistentReplicator) spy(
topic.getReplicators().get(topic.getReplicators().keys().get(0)));

MessageId id = topic.getLastMessageId().get();
admin1.topics().expireMessages(dest.getPartitionedTopicName(),
replicator.getCursor().getName(),
id,false);

replicator.updateRates();

ReplicatorStats status = replicator.getStats();
assertEquals(status.getReplicationBacklog(), 0);
}

@Test(timeOut = 30000)
public void testResetCursorNotFail() throws Exception {

Expand Down

0 comments on commit 209b222

Please sign in to comment.